Мы разрабатываем приложение Kafka Streams, используя низкоуровневый процессорный API.
Согласно документации по Kafka, все потоки и параллелизм обрабатываются потоками потока и задачами потока. Параллелизм также можно масштабировать, используя разделы по темам.
Текущий код выглядит так:
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Do processing on the stream thread itself
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
Однако рекомендуется ли при каких-либо обстоятельствах создавать собственные потоки для выполнения фактической обработки? Это будет означать использование API Kafka Streams в основном для потребления данных из темы, а не для фактической обработки. Фактическая обработка будет происходить в новых потоках, которые вызываются после первоначального потребления данных в потоке потока Kafka.
Пример процессора в топологии:
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Spawn new thread to do the processing
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
//Do more processing
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
Я испробовал самый базовый код для этого, но не уверен, что он вмешивается в автоматические функции, предоставляемые Kafka. Например, автоматическое смещение, тайм-ауты и т. д.
Или всегда лучше придерживаться поведения по умолчанию, уже обеспеченного потоками Kafka, и использовать поток потоков для быстрой обработки данных?