сравнить каждую отдельную входящую запись из входной темы с ее отдельной предыдущей записью - PullRequest
0 голосов
/ 23 декабря 2018

Я новичок в потоке Кафки, мой вариант использования - сравнить значение каждой отдельной входящей записи из входной темы со значением из отдельной предыдущей записи и, если условие сравнения истинно, отправить новую запись, содержащую результат сравненияс индексом каждой сравниваемой записи в теме результатов, иначе не отправлять ничего.(обратите внимание, что все входящие записи могут иметь уникальный ключ для каждой записи или нулевой ключ).

Сделать это в API-интерфейсе Kafka для потребителей и производителей очень легко, но ((без использования внешней БД для хранения предыдущей записи)) я пытаюсь использовать только (Kafka streams DSL API), (включая KTable и KStream, с их внутренними методами, такими как, агрегировать, сокращать и т.д.состояние, чтобы сравнить его с текущим, затем сохранить текущую запись вместо старой, чтобы сравнить ее со следующей входящей записью.Несколько подходов пытаются использовать Processor API вместо Stream DSL API, но он включает в себя большую сложность, и я не совсем понял это.Вот почему я пытаюсь решить мою проблему с Stream DSL API.Но до сих пор, к сожалению, у меня ничего не получалось.

На самом деле, до сих пор у меня ничего не получалось.Можете ли вы помочь мне, предоставив подробный пример кода, чтобы сделать это с помощью Kafka Stream DSL?

1 Ответ

0 голосов
/ 23 декабря 2018

Вы можете использовать Processor API.

Необходимо реализовать метод интерфейса Transformer, который преобразует:

  1. Будет искать значение для ключа,

    1.1если нет, положить значение в хранилище и закончить

    1.2, если присутствует, вычислить значение, сохранить новое значение в хранилище и передать результат в тему вывода

Пример кода:

object SampleApp extends App {

  val storeName: String = ???
  val builder: StreamsBuilder = new StreamsBuilder()
  builder.stream("topicName")(Consumed.`with`(Serdes.String(), Serdes.String()))
    .transform[String, String](() => SampleTransformer(storeName))
    .to("outputTopic")(Produced.`with`(Serdes.String(), Serdes.String()))
}


case class SampleTransformer(storeName: String)
  extends Transformer[String, String, KeyValue[String, String]]
    with LazyLogging {

  var store: KeyValueStore[String, String] = _

  override def init(context: ProcessorContext): Unit = {
    super.init(context)
    store = context.getStateStore(storeName).asInstanceOf[KeyValueStore[String, String]]
  }

  override def transform(key: String, newValue: String): String = {
    val valueToPass = Option(store.get(key)).map(oldValue => someComputation(oldValue, newValue))
    store.put(key, newValue)
    valueToPass.orNull
  }

  def someComputation(oldValue: String, newValue: String): String = ???

  override def close(): Unit = {
    // Close
  }
}
...