/ 06 августа 2020

Привет, я нахожусь в ситуации, когда у меня есть источник вроде

final case class Transaction(
    token: String,
    processingTime: Long

def rawSource: SourceWithContext[Either[NullJsonError, Json], CommittableOffset, Control] = Consumer
      .committableSource[String, Either[NullJsonError, Json]](
      .map {
        case Right(json) if json == Json.Null =>
        case other => other

private val runnableGraph = rawSource
    .map(standardizerStage.stageMap) // This will convert Either[NullJsonError, Json] to  Either[NullJsonError, Transaction] 
    .collect {
      case Right(Some(value)) => {
      case Left(error) => Left(error)
    .mapAsync(10)(saveToDB) // This will process the Transaction i.e. save to DB
    .map { case (_, committableOffset) => committableOffset }

Теперь я хочу сгруппировать входящие сообщения от kafka по Transaction.token с интервалами в 5 секунд или 5000 сообщений в зависимости от того, что наступит раньше , а затем из каждой группы выберите только одну транзакцию с наивысшим Transaction.processingTime и передайте ее и сохраните в БД.

Теперь я знаю, что мне нужно использовать что-то вроде

.groupedWithin(5000, 5.seconds)
      .mapConcat { group =>
        val commitableOffsetBatch = CommittableOffsetBatch.apply(group.map(_.committableOffset))

Но я не знаю, как я могу использовать его для группировки по token и фильтрации по processingTime для каждой группы после .map(standardizerStage.stageMap) в приведенном выше коде, потому что только после .map(standardizerStage.stageMap) будет объект транзакции.
