Как сделать паузу / запустить / остановить Кафка Производитель / Кафка Шаблон - PullRequest
0 голосов
/ 06 сентября 2018

Я использую весеннее загрузочное приложение с интеграцией kafka и хочу реализовать конечную точку, чтобы остановить и запустить kafka от публикации сообщений. Сообщение запускается асинхронно другими конечными точками.

Бобы KafkaTemplate<String, String> или ProducerFactory<String, String> producerFactory() не имеют никаких действий остановки и паузы.

Моя цель - иметь возможность смоделировать сбои соединения и убедиться, что эти сообщения хранятся в резервных механизмах, которые у меня есть.

Есть идеи?

1 Ответ

0 голосов
/ 06 сентября 2018

У KafkaTemplate нет таких обратных вызовов, потому что это пассивный компонент, который может выполнять эту работу, только если мы его вызываем.

Для симуляции сбоя соединения я бы предложил вам реализовать пользовательский ProducerFactory, создающий KafkaProducer с имитацией или переопределением Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);. Там в этом ProducerFactory вы можете реализовать соответствующие обратные вызовы lifecylce и реагировать на состояние в упомянутой реализации send().

У org.apache.kafka.clients.producer.MockProducer может быть что-то, что вы можете использовать или одолжить. Например, см. Его close() или fenceProducer().

...