Как преобразовать событие Kafka Stream и отправить его в другую тему, только если можно преобразовать - PullRequest
0 голосов
/ 10 июля 2019

Я хочу создать простой поток Кафки, который пытается преобразовывать события, основываясь на некоторых условиях. Если событие может быть преобразовано, преобразованное событие переходит в другую тему. Если событие не может быть преобразовано, оно снова сохраняется в той же теме для последующей попытки.

Допустим, у меня есть это:

case class Foo(a: String, b: String, c: Boolean)

def translate(value: String): Option[Foo] = {
  // ...
  // Returns an Option of Foo
}

Так что мне нужно что-то вроде этого:

val builder: StreamsBuilder = new StreamsBuilder()

builder
  .stream(topic)
  .map[String, String]((key, value) => translate(value))
  // If translate(value) is Some(value) send the value to a topic
  // Otherwise, send the original value (without being transformed) to the same topic

Я полностью застрял в этой проблеме. Самая близкая вещь, с которой я столкнулся, это попытаться создать структуру с логическим значением, которое сообщает мне, можно ли преобразовать событие, или нет, а затем создать различные потоки с .branch. Например, что-то вроде этого:

def translate(value: String): (Boolean, Option[CPCTTMDataTransformed]) = {
  val eventTransformed = transform(value)
  eventTransformed match {
    case Some(value) => (true, Option(value))
    case None => (false, None)
  }
}

А затем попробуйте сделать что-то вроде этого:

builder
  .stream(topic)
  .map[String, (Boolean, Option[Foo])]((key, value) => translate(value))
  .branch(
    (_, element) => element._1,
  )
  .foreach {
    // Send the "true" to one topic and in the "false", send the original message to the original topic 
  }

Но, конечно, мне понадобится оригинальное событие, чтобы отправить его в тему.

Я хотя и о более сложных структурах, но в конце я всегда возвращаюсь к проблеме ветвления потока на основе условия Some - None.

1 Ответ

1 голос
/ 11 июля 2019

Может быть, использовать API процессора.У вас есть один Processor, который выполняет перевод, и если перевод успешен, вы context.forward(To.child("translated")) в противном случае вы context.forward(To.child("retry")).

Вы вручную соединяете свой Topology:

Topology topology = new Topology();
topology.addSource("source", topic);
topology.addProcessor("translator", () -> new TranslateProcessor(), "source");
topology.addSink("translated", resultTopic, "translator");
topology.addSink("retry", topic, "translator");
...