Прежде всего, не существует указанного стандарта.
Возможные варианты:
- WEB-приложение Java EE
- WEB-приложение Spring
- Приложение с Spring-kafka (
@KafkaListener
)
Производитель Kafka потенциально может принять некоторые команды.В реальном сценарии я работал с приложениями, которые непрерывно работают со слушателями, получая запросы, которые запускали некоторые задания, пакеты и т. Д.
Этого можно достичь, используя, например:
- Веб-сервер, принимающий HTTP-запросы
- Автономное приложение Spring с
@KafkaListener
Потребителем может быть приложение Spring с @KafkaListener
.
@KafkaListener(topics = "${some.topic}")
public void accept(Message message) {
// process
}
Приложение Springс @KafkaListener
будет работать бесконечно по умолчанию.Контейнеры слушателей, созданные для аннотаций @KafkaListener
, регистрируются с помощью компонента инфраструктуры типа KafkaListenerEndpointRegistry
.Этот бин управляет жизненным циклом контейнеров;он автоматически запустит все контейнеры, для которых autoStartup
установлено в true
.KafkaMessageListenerContainer
использует TaskExecutor
для выполнения основного цикла KafkaConsumer
.
Документация для получения дополнительной информации.
Если вы решите обходиться без каких-либо платформ и серверов приложений, возможное решение - создать прослушиватель в отдельном потоке:
public class ConsumerListener implements Runnable {
private final Consumer<String, String> consumer = new KafkaConsumer<>(properties);
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
// consume
}
}
} finally {
consumer.close();
}
}
}