блокировка производителя кафки при обратном вызове - PullRequest
0 голосов
/ 02 июля 2018

Я тестирую async send () в моем производителе кафки. Кластер, к которому я хочу подключиться, находится в автономном режиме. Я предполагаю, что я быстро отправлю 10000 индивидуальных запросов (длина listToSend). Затем наступит тайм-аут (60 с), и через 60 секунд я увижу, что обратные вызовы поразили меня logger.error(s"failed to send record ${x._2}", e) Однако, похоже, что для завершения метода потребуется вечность.

Вот почему я добавил в строку logger.debug("test: am I sending data").

Он печатает, затем ничего не происходит в течение 60 секунд. Я вижу неудачный обратный вызов для 1-й записи. И только тогда он будет двигаться дальше.

Это нормальное поведение или я упускаю что-то фундаментальное?

listToSend.foreach { x =>
        logger.debug("test: am I sending data")
        // note: I added this 'val future =' in an attempt to fix this, to no avail
        val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
          override def onCompletion(metadata: RecordMetadata, e: Exception) {

            if (e != null) {
              //todo: handle failed sends, timeouts, ...
              logger.error(s"failed to send record ${x._2}", e)
            }
            else { //nice to have: implement logic here, or call another method to process metadata
              logger.debug("~Callback success~")
            }
          }
        }
        )
      }

примечание: Я не хочу блокировать этот код, я хочу сохранить его асинхронным. Тем не менее, похоже, что он блокирует send () независимо.

1 Ответ

0 голосов
/ 17 июля 2018

Параллелизм я так и не понял полностью.

Однако, похоже, мое название темы (я назвал его «[имя проекта] _connection») было проблемой.

Хотя я не знал ни одного зарезервированного ключевого слова в названии темы, это поведение выскочило.

Некоторые дальнейшие эксперименты также выявили, что имя темы с завершающим пробелом также может вызывать такое поведение. Производитель попытается отправить его в эту тему, но кластер Kafka, похоже, не знает, как с этим справиться, вызывая эти таймауты.

Так что для всех, кто сталкивался с этой проблемой, проверьте / измените название своей темы, прежде чем приступать к устранению неполадок.

...