Привет, я нахожусь в ситуации, когда у меня есть источник вроде
final case class Transaction(
token: String,
processingTime: Long
)
def rawSource: SourceWithContext[Either[NullJsonError, Json], CommittableOffset, Control] = Consumer
.committableSource[String, Either[NullJsonError, Json]](
consumerSettings(kafkaConfigs),
Subscriptions.topics(kafkaConfigs.topic)
)
.asSourceWithContext(_.committableOffset)
.map(_.record.value())
.map {
case Right(json) if json == Json.Null =>
Left(NullJsonError)
case other => other
}
private val runnableGraph = rawSource
.map(standardizerStage.stageMap) // This will convert Either[NullJsonError, Json] to Either[NullJsonError, Transaction]
.collect {
case Right(Some(value)) => {
Right(value)
}
case Left(error) => Left(error)
}
.mapAsync(10)(saveToDB) // This will process the Transaction i.e. save to DB
.asSource
.map { case (_, committableOffset) => committableOffset }
.toMat(Committer.sink(CommitterSettings(system)))(Keep.both)
.mapMaterializedValue(Consumer.DrainingControl.apply)
Теперь я хочу сгруппировать входящие сообщения от kafka по Transaction.token
с интервалами в 5 секунд или 5000 сообщений в зависимости от того, что наступит раньше , а затем из каждой группы выберите только одну транзакцию с наивысшим Transaction.processingTime
и передайте ее и сохраните в БД.
Теперь я знаю, что мне нужно использовать что-то вроде
.groupedWithin(5000, 5.seconds)
.mapConcat { group =>
val commitableOffsetBatch = CommittableOffsetBatch.apply(group.map(_.committableOffset))
commitableOffsetBatch.commitScaladsl()
group
}
Но я не знаю, как я могу использовать его для группировки по token
и фильтрации по processingTime
для каждой группы после .map(standardizerStage.stageMap)
в приведенном выше коде, потому что только после .map(standardizerStage.stageMap)
будет объект транзакции.