У меня есть тест, который по темпераменту оставляет открытый поток производителя с непрерывной регистрацией ошибок.
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,526] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,628] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,730] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-06-01 15:52:48,982] WARN [Producer clientId=test-deletion-stream-application-9d94ddd6-6f29-4364-890e-0d9676782edd-StreamThread-1-producer] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Тест работает, но иногда не проходит, как описано выше.
test("My test") {
val topology = Application.getTopology(...)
val streams = new KafkaStreams(topology,properties)
withRunningKafka {
createCustomTopic(eventTopic)
val streamId = UUIDs.newUuid().toString
logger.info(s"Creating stream with Application ID: [$streamId]")
val streams = new KafkaStreams(topology, streamConfig(streamId, PropertiesConfig.asScalaMap(props)))
try {
publishToKafka(eventTopic, key = keyMSite1UID1, message = event11a)
// ... several more publishings
Thread.sleep(publishingDelay) // Give time to initialize
streams.start()
Thread.sleep(deletionDelay)
withConsumer[MyKey, MyEvent, Unit] { consumer =>
val consumedMessages: Stream[(MyKey, MyEvent)] =
consumer.consumeLazily[(MyKey, MyEvent)](eventTopic)
val messages = consumedMessages.take(20).toList
messages.foreach(tuple => logger.info("EVENT END: " + tuple))
messages.size should be(6)
// several assertions here
}
} finally {
streams.close()
}
}(config)
}
Особенность заключается в том, что потоковое приложение генерирует события удаления по той же теме, из которой оно получает.
В этом наборе есть два похожих теста.Я выполняю набор тестов под sbt, вот так:
testOnly *MyTest
Четыре из пяти выполнений оставляют висящий поток, публикующий эти ошибки на неопределенный срок.Они появляются группами по 3, но я тоже не знаю, почему.
Я пытался установить задержки после вызовов close (), но, похоже, это не помогает.Как избежать висящих потоков продюсера?