Группировка событий с помощью fs2.Stream - PullRequest
6 голосов
/ 09 марта 2019

У меня есть поток событий следующим образом:

sealed trait Event

val eventStream: fs2.Stream[IO, Event] = //...

Я хочу сгруппировать полученные события в течение одной минуты (т. Е. От 0 до 59 секунд каждой минуты). Это звучит довольно просто с fs2

val groupedEventsStream = eventStream groupAdjacentBy {event => 
    TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}

Проблема в том, что функция группировки не является чистой . Используется currentTimeMillis. Я могу решить это следующим образом:

stream.evalMap(t => IO(System.currentTimeMillis(), t))
  .groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))

Дело в том, что добавляет неуклюжий шаблон с кортежами, которых я бы хотел избежать. Есть ли другие решения?

Или, может быть, использование нечистой функции не так уж плохо для такого случая?

1 Ответ

1 голос
/ 14 мая 2019

Вы можете удалить часть шаблона, используя cats.effect.Clock :

def groupedEventsStream[A](stream: fs2.Stream[IO, A])
                          (implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
  stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
    .groupAdjacentBy(_._1)
...