Желательно ли запускать новые темы (используя программные средства) в приложении Kafka Streams? - PullRequest
1 голос
/ 29 мая 2019

Мы разрабатываем приложение Kafka Streams, используя низкоуровневый процессорный API.

Согласно документации по Kafka, все потоки и параллелизм обрабатываются потоками потока и задачами потока. Параллелизм также можно масштабировать, используя разделы по темам.

Текущий код выглядит так:

public class Processor implements Processor<K, V> {

@Override
  public void process(String key, V value) {

      //Do processing on the stream thread itself
      ...

      // Write back to output topic
      context.forward(key, updatedValue)
    }); 
  }
}

Однако рекомендуется ли при каких-либо обстоятельствах создавать собственные потоки для выполнения фактической обработки? Это будет означать использование API Kafka Streams в основном для потребления данных из темы, а не для фактической обработки. Фактическая обработка будет происходить в новых потоках, которые вызываются после первоначального потребления данных в потоке потока Kafka.

Пример процессора в топологии:

public class Processor implements Processor<K, V> {

@Override
  public void process(String key, V value) {

  //Spawn new thread to do the processing
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.submit(() -> {
      String threadName = Thread.currentThread().getName();
      System.out.println("Hello " + threadName);

      //Do more processing
      ...

      // Write back to output topic
      context.forward(key, updatedValue)
    }); 
  }
}

Я испробовал самый базовый код для этого, но не уверен, что он вмешивается в автоматические функции, предоставляемые Kafka. Например, автоматическое смещение, тайм-ауты и т. д.

Или всегда лучше придерживаться поведения по умолчанию, уже обеспеченного потоками Kafka, и использовать поток потоков для быстрой обработки данных?

Ответы [ 2 ]

0 голосов
/ 03 июня 2019

Не рекомендуется запускать собственные потоки, поскольку это нарушает гарантии отказоустойчивости Kafka Streams. Если возвращается process(), Kafka Streams предполагает, что сообщение было полностью обработано и все потенциальные выходные сообщения были отправлены через forward(). В этом случае Kafka Streams может зафиксировать смещение входной записи.

Однако, если вы обработаете сообщение в фоновом потоке, и поток не сможет обработать его, Kafka Streams ничего не узнает об этом, и, следовательно, смещения могут быть зафиксированы, даже если произойдет сбой и сообщение будет потеряно.

Кроме того, фоновый поток не может вызывать forward() после возврата process(). Если forward() вызывается "снаружи" из process() Kafka Streams выдаст исключение.

Если не исключено использование собственных фоновых потоков и сохранение хотя бы разовых гарантий обработки, однако, это довольно сложно и поэтому не рекомендуется.

0 голосов
/ 29 мая 2019

Kafka streams использует потребительский API kafka для получения сообщений из темы kafka. Это означает, что даже если вы создаете несколько потоков для потребления сообщений, дополнительные потоки останутся бездействующими.

Например, если ваша тема имеет 5 разделов, даже если вы создали 10 потоков для чтения из темы, пользовательский API kafka будет использовать только 5 потоков для чтения из темы, а остальные темы будут простаивать.

Вы можете определить количество потоков, которые будут создаваться при определении конфигурации ваших потоков.

Properties streamsConfig = new Properties();

streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); //Here number of threads being spawned per kafka streams app node is 1 

Таким образом, если ваша тема имеет 10 разделов, а ваше приложение kafka streams развернуто на двух узлах, тогда NUM_STREAM_THREADS_CONFIG будет равно 5.

Дайте мне знать, если вам нужна дополнительная помощь!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...