Я хочу создать простой поток Кафки, который пытается преобразовывать события, основываясь на некоторых условиях. Если событие может быть преобразовано, преобразованное событие переходит в другую тему. Если событие не может быть преобразовано, оно снова сохраняется в той же теме для последующей попытки.
Допустим, у меня есть это:
case class Foo(a: String, b: String, c: Boolean)
def translate(value: String): Option[Foo] = {
// ...
// Returns an Option of Foo
}
Так что мне нужно что-то вроде этого:
val builder: StreamsBuilder = new StreamsBuilder()
builder
.stream(topic)
.map[String, String]((key, value) => translate(value))
// If translate(value) is Some(value) send the value to a topic
// Otherwise, send the original value (without being transformed) to the same topic
Я полностью застрял в этой проблеме. Самая близкая вещь, с которой я столкнулся, это попытаться создать структуру с логическим значением, которое сообщает мне, можно ли преобразовать событие, или нет, а затем создать различные потоки с .branch
. Например, что-то вроде этого:
def translate(value: String): (Boolean, Option[CPCTTMDataTransformed]) = {
val eventTransformed = transform(value)
eventTransformed match {
case Some(value) => (true, Option(value))
case None => (false, None)
}
}
А затем попробуйте сделать что-то вроде этого:
builder
.stream(topic)
.map[String, (Boolean, Option[Foo])]((key, value) => translate(value))
.branch(
(_, element) => element._1,
)
.foreach {
// Send the "true" to one topic and in the "false", send the original message to the original topic
}
Но, конечно, мне понадобится оригинальное событие, чтобы отправить его в тему.
Я хотя и о более сложных структурах, но в конце я всегда возвращаюсь к проблеме ветвления потока на основе условия Some
- None
.