Проблемы соединения 2-х потоков kafka (используя пользовательский timestampextractor) - PullRequest
0 голосов
/ 12 декабря 2018

У меня проблемы с объединением двух потоков кафки, извлекающих дату из полей моего события.Объединение работает нормально, когда я не определяю пользовательский TimeStampExtractor, но когда я это делаю, объединение больше не работает.Моя топология довольно проста:

val builder = new StreamsBuilder()

val couponConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroCouponSerde(schemaRegistryHost, schemaRegistryPort))
val couponStream: KStream[String, Coupon] = builder.stream(couponInputTopic, couponConsumedWith)

val purchaseConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroPurchaseSerde(schemaRegistryHost, schemaRegistryPort))
val purchaseStream: KStream[String, Purchase] = builder.stream(purchaseInputTopic, purchaseConsumedWith)

val couponStreamKeyedByProductId: KStream[String, Coupon] = couponStream.selectKey(couponProductIdValueMapper)
val purchaseStreamKeyedByProductId: KStream[String, Purchase] = purchaseStream.selectKey(purchaseProductIdValueMapper)

val couponPurchaseValueJoiner = new ValueJoiner[Coupon, Purchase, Purchase]() {

  @Override
  def apply(coupon: Coupon, purchase: Purchase): Purchase = {
      val discount = (purchase.getAmount * coupon.getDiscount) / 100
      new Purchase(purchase.getTimestamp, purchase.getProductid, purchase.getProductdescription, purchase.getAmount - discount)
  }
}

val fiveMinuteWindow = JoinWindows.of(TimeUnit.MINUTES.toMillis(10))
val outputStream: KStream[String, Purchase] = couponStreamKeyedByProductId.join(purchaseStreamKeyedByProductId,
  couponPurchaseValueJoiner,
  fiveMinuteWindow
  )

outputStream.to(outputTopic)

builder.build()

Как я уже сказал, этот код работает как чудо, когда я не использую пользовательский TimeStampExtractor, но когда я делаю это, устанавливая StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG в свой пользовательский класс экстрактора (I ')мы дважды проверили, что класс правильно извлекает дату) объединение больше не работает.

Я проверяю топологию, выполняя модульный тест и передавая ему следующие события:

    val coupon1 = new Coupon("Dec 05 2018 09:10:00.000 UTC", "1234", 10F)
    // Purchase within the five minutes after the coupon - The discount should be applied
    val purchase1 = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 25.00F)
    val purchase1WithDiscount = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 22.50F)
    val couponRecordFactory1 = couponRecordFactory.create(couponInputTopic, "c1", coupon1)
    val purchaseRecordFactory1 = purchaseRecordFactory.create(purchaseInputTopic, "p1", purchase1)

    testDriver.pipeInput(couponRecordFactory1)
    testDriver.pipeInput(purchaseRecordFactory1)
    val outputRecord1 = testDriver.readOutput(outputTopic,
      new StringDeserializer(),
      JoinTopologyBuilder.getAvroPurchaseSerde(
        schemaRegistryHost,
        schemaRegistryPort).deserializer())
    OutputVerifier.compareKeyValue(outputRecord1, "1234", purchase1WithDiscount)

Не уверен, что шаг выбора нового ключа избавляется от правильной даты.Я проверил много комбинаций без удачи: (

Любая помощь будет очень признательна!

Ответы [ 2 ]

0 голосов
/ 13 декабря 2018

Спасибо, что ответили.Я работал над этим вчера, и я думаю, что нашел проблему.Как вы сказали, я использую TopologyTestDriver для запуска своих тестов, и когда вы инициализируете класс TopologyTestDriver, он использует initialWallClockTime, если вы не укажете значение, TopologyTestDriver подберет currentTimeMillis:

public TopologyTestDriver(Topology topology, Properties config) {
    this(topology, config, System.currentTimeMillis());
} 

Тамэто другой конструктор, который позволяет вам передавать initialWallClockTime.Я тестировал этот метод, но по какой-то причине он не работает для меня.

Таким образом, чтобы подвести итог, я решил создать объекты Purchase и Coupon с текущей меткой времени.Я все еще использую свой пользовательский экстрактор меток времени, но вместо того, чтобы жестко кодировать дату, я всегда получаю текущую метку времени, и таким образом соединение работает нормально.

Не совсем доволен моим конечным решением, потому что я не знаю почемуinitialWallClockTime у меня не работает, но, по крайней мере, тесты теперь работают нормально.

0 голосов
/ 12 декабря 2018

Я не уверен в этом, потому что я не знаю, сколько вы тестируете свой код, но я предполагаю, что:

1) ваш код работает с экстрактором временных меток по умолчанию, потому что он используетвремя, когда вы отправляете запись в каналы как записи меток времени, поэтому в основном это будет работать, потому что в вашем тесте вы отправляете данные один за другим без паузы.

2) вы используете TopologyTestDriverсделать ваши тесты!Обратите внимание, что это очень полезно для тестирования вашего бизнес-кода и топологии в целом (что я имею в качестве входных данных и что является правильным в соответствии с выходными данными), но в этих тестах не работает приложение Kafka Stream.

В вашем случае вы можете поиграть с методом advanceWallClockTime(long) в классе TopologyTestDriver, чтобы имитировать переход по системному времени.

Если вы хотите запустить топологию, вам нужно будет провести интеграционный тест со встроенной кафкойкластера (есть одна на библиотеках kafka, которая работает просто отлично!).

Дайте мне знать, если это поможет: -)

...