Потребление Kafka с использованием ThreadPool не гарантирует порядок? - PullRequest
0 голосов
/ 25 апреля 2018

У меня есть Kafka topic с 1-partition.1 listener определяется в моем приложении для весенней загрузки с использованием @KafkaListener.listener использует ThreadPoolTaskExecutor, который выбирает ConsumerRecord и обрабатывает его.Тем не менее, я вижу строгий порядок, который обещания kafka не выполняются, в этом сценарии, как я вижу иногда offset прыжков (проверено с использованием временной метки), когда параллельные потоки начинают обрабатывать ... Итак вопросы:

  1. Почему упорядочение не следует для параллельных потоков в слушателе?
  2. Как мы можем достичь параллелизма и упорядочения одновременно, чтобы параллельный поток принимал следующее смещение и не переходил?

РЕДАКТИРОВАТЬ 1

public class DefaultTopicListener {
    @Autowired
    ThreadPoolTaskExecutor executorPool;

    @KafkaListener(topicPartitions=@TopicPartition(topic="defaultTopic", 
partitions={"0"}))
    public void onMessage(ConsumerRecord<String, CustomPayload> request) {
        CustomPayload message = request.value();
        try {
            executorPool.execute(new Runnable() {
                @Override
                public void run() {
                    logger.info(
                            "onMessage : executorPool_THREAD_{}-> -> Offset {}.... ",
                            Thread.currentThread().getId(), request.offset());
                }
            });
        }  catch (RejectedExecutionException ex) {
            logger.error(
                    "onMessage : executorPool -> Queue Full Request Rejected for offset -> {}", ex, );
        }
    }
public class Config {
    @Bean("executorPool")
    public ThreadPoolTaskExecutor executorPool(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(5);

        return executor;
    }
}

Просьба сообщить.

Ответы [ 2 ]

0 голосов
/ 25 апреля 2018

Кафка обычно рекомендует одну нить на потребителя. Если вы хотите отделить обработку от потребления, в этом случае вручите экземпляры ConsumerRecords блокирующей очереди, занятой пулом потоков процессора, которые фактически обрабатывают обработку записи.

https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Однако упорядочение в этом случае не гарантируется, так как потоки будут выполняться независимо, более ранняя порция данных может фактически обрабатываться после более поздней порции данных только из-за удачного времени выполнения потока.

Упорядочение и параллелизм могут быть достигнуты при наличии нескольких разделов и одного потока, отвечающего за раздел, все записи в разделе будут обрабатываться потоком по порядку.

0 голосов
/ 25 апреля 2018

Непонятно, что вы имеете в виду.Пулы потоков не «выбирают» вещи, им дают задачи для запуска.Вам нужно показать свой код.

Предположение ...

Если ваш слушатель передает ConsumerRecord в пул потоков, то, конечно, порядок записей теряется, так как записиобрабатывается в разных потоках (если только пул не имеет размер 1).

Для одного раздела контейнер слушателя вызывает слушателя в одном потоке.Вы не должны передавать работу другим потокам, если хотите сохранить порядок.

Единственный способ достичь параллелизма - это использовать несколько разделов и увеличить параллелизм в контейнере.Разделы будут распределены по потокам контейнера.

Или вам нужно управлять подтверждениями в вашем коде, чтобы убедиться, что «переходы» не зафиксированы.

Порядок гарантирован только в пределах разделапоэтому, опять же, вы не должны переходить на другую ветку.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...