Группа потоков и фильтр Akka - PullRequest
0 голосов
/ 06 августа 2020

Привет, я нахожусь в ситуации, когда у меня есть источник вроде

final case class Transaction(
    token: String,
    processingTime: Long
)



def rawSource: SourceWithContext[Either[NullJsonError, Json], CommittableOffset, Control] = Consumer
      .committableSource[String, Either[NullJsonError, Json]](
        consumerSettings(kafkaConfigs),
        Subscriptions.topics(kafkaConfigs.topic)
      )
      .asSourceWithContext(_.committableOffset)
      .map(_.record.value())
      .map {
        case Right(json) if json == Json.Null =>
          Left(NullJsonError)
        case other => other
      }


private val runnableGraph = rawSource
    .map(standardizerStage.stageMap) // This will convert Either[NullJsonError, Json] to  Either[NullJsonError, Transaction] 
    .collect {
      case Right(Some(value)) => {
        Right(value)
      }
      case Left(error) => Left(error)
    }
    .mapAsync(10)(saveToDB) // This will process the Transaction i.e. save to DB
    .asSource
    .map { case (_, committableOffset) => committableOffset }
    .toMat(Committer.sink(CommitterSettings(system)))(Keep.both)
    .mapMaterializedValue(Consumer.DrainingControl.apply)

Теперь я хочу сгруппировать входящие сообщения от kafka по Transaction.token с интервалами в 5 секунд или 5000 сообщений в зависимости от того, что наступит раньше , а затем из каждой группы выберите только одну транзакцию с наивысшим Transaction.processingTime и передайте ее и сохраните в БД.

Теперь я знаю, что мне нужно использовать что-то вроде

.groupedWithin(5000, 5.seconds)
      .mapConcat { group =>
        val commitableOffsetBatch = CommittableOffsetBatch.apply(group.map(_.committableOffset))
        commitableOffsetBatch.commitScaladsl()
        group
      }

Но я не знаю, как я могу использовать его для группировки по token и фильтрации по processingTime для каждой группы после .map(standardizerStage.stageMap) в приведенном выше коде, потому что только после .map(standardizerStage.stageMap) будет объект транзакции.

...