Ошибка производителя в Scala Embedded Kafka с потоками Kafka - PullRequest
0 голосов
/ 01 июня 2018

У меня есть тест, который по темпераменту оставляет открытый поток производителя с непрерывной регистрацией ошибок.

[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Тест работает, но иногда не проходит, как описано выше.

test("My test") {
  val topology = Application.getTopology(...)
  val streams = new KafkaStreams(topology,properties)

  withRunningKafka {
    createCustomTopic(eventTopic)
    val streamId = UUIDs.newUuid().toString
    logger.info(s"Creating stream with Application ID: [$streamId]")
    val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))

    try {
      publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
      // ... several more publishings
      Thread.sleep(publishingDelay) // Give time to initialize
      streams.start()
      Thread.sleep(deletionDelay)

      withConsumer[MyKey, MyEvent, Unit] { consumer =>
        val consumedMessages: Stream[(MyKey, MyEvent)] =
          consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
        val messages = consumedMessages.take(20).toList
        messages.foreach(tuple => logger.info("EVENT   END: " + tuple))
        messages.size should be(6)
        // several assertions here
      }
    } finally {
      streams.close()
    }
  }(config)
}

Особенность заключается в том, что потоковое приложение генерирует события удаления по той же теме, из которой оно получает.

В этом наборе есть два похожих теста.Я выполняю набор тестов под sbt, вот так:

testOnly *MyTest

Четыре из пяти выполнений оставляют висящий поток, публикующий эти ошибки на неопределенный срок.Они появляются группами по 3, но я тоже не знаю, почему.

Я пытался установить задержки после вызовов close (), но, похоже, это не помогает.Как избежать висящих потоков продюсера?

1 Ответ

0 голосов
/ 11 июня 2018

В своем тесте вы создаете два KafkaStreams экземпляра, но вы только close() один.Я предполагаю, что недостающий Producer относится к экземпляру, который вы не закрываете.Обратите внимание, что вам нужно позвонить KafkaStreams#close(), даже если вы никогда не звонили KafkaStreams#start().

...