Кафка производитель и потребительские вопросы - PullRequest
1 голос
/ 28 мая 2019

У меня есть две проблемы с Apache Kafka.

Выпуск 1

  1. Отправка 100 000 сообщений в Кафку
  2. Завершите работу служб zookeeper и kafka с помощью Ctrl-C перед потребителем. потребляет все 100 000 сообщений (это было смоделировано с помощью Thread.sleep(1000) в методе потребления).

Discovery

Потребитель продолжал записывать сообщения на консоли после завершения работы служб zookeeper и kafka.

Expectation

Потребитель должен прекратить использование сообщений и перезапускаться с индекса последнего сообщения + 1 после того, как zookeeper и kafka были вызваны.

Вопрос

Как заставить потребителя продолжить работу с индекса + 1 последнего использованного сообщения.

Выпуск 2

  1. Отправить 100 000 сообщений в Кафку
  2. Завершите работу служб zookeeper и kafka с помощью Ctrl-C перед потребителем. потребляет все 100 000 сообщений (это было смоделировано с помощью Thread.sleep(1000) в методе потребления).
  3. Убивает приложение весенней загрузки, потребляя сообщения
  4. Воспользуйтесь услугами зоопарка и кафки.
  5. Откройте приложение весенней загрузки, отвечающее за потребление сообщений.

Discovery

Потребитель принимает все сообщения от начала, игнорируя последнее использованное сообщение.

Expectation

Потребитель должен начать использовать индекс + 1 последнего использованного сообщения до того, как приложение весенней загрузки было закрыто.

Вопрос

Как заставить потребителя продолжить с индекса + 1 последнего использованного сообщения.

Фрагменты кода

KafkaConsumerConfig

@Configuration
@EnableKafka
public class KafkaConsumerConfig
{
    @Bean
    public Map<String, Object> consumerConfigs()
    {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-group");
        return props;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory()
    {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
            kafkaListenerContainerFactory()
    {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(100);
        return factory;
    }
}

KafkaProducerConfig

@Configuration
    public class KafkaProducerConfig
    {
        @Bean
        public ProducerFactory<Integer, String> producerFactory()
        {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public Map<String, Object> producerConfigs()
        {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }

        @Bean
        public KafkaTemplate<Integer, String> kafkaTemplate()
        {
            return new KafkaTemplate<>(producerFactory());
        }
    }

KafkaProducer

@Component
public class KafkaProducer
{

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String message, final String topicName)
    {

        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>()
        {

            @Override
            public void onSuccess(SendResult<String, String> result)
            {
                System.out.println("Sent message=[" + message
                        + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            }

            @Override
            public void onFailure(Throwable ex)
            {
                System.out.println("Unable to send message=["
                        + message + "] due to : " + ex.getMessage());
            }
        });
    }
}

KafkaConsumer

@Component
public class KafkaConsumer
{

    @KafkaListener(id = "basic", topics = "test-1", clientIdPrefix = "test-prefix-id", autoStartup = "true", concurrency = "3")
    public void multipleTransactionNotification(@Payload final String message)
    {
        try
        {
            Thread.sleep(1000);
            System.out.println(message);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

TestApplication

@SpringBootApplication
public class TestApplication
{    
    public static void main(String[] args)
    {
        SpringApplication.run(TestApplication.class, args);
    }

   @Bean
    public CommandLineRunner commandLineRunner(ApplicationContext ctx, KafkaProducer producer, NIPKafkaConsumer consumer) {
        return args -> {

            for (int i = 0; i < 100_000; i++)
            {
                producer.sendMessage("New Message-" + i, "test-1");
            }

        };
    }    

}

1 Ответ

0 голосов
/ 28 мая 2019

выпуск 1

Spring Kafka делает KafkaConsumer.poll() для извлечения записей, а затем вызывает вашего слушателя для каждой записи.

Таким образом, если пакет содержит 100 записей, потребуется 100 секунд, чтобы обработать весь пакет с вашим фактическим KafkaListener.

Выпуск 2

Потребитель фиксирует смещения, чтобы знать, с чего продолжить.

Поведение по умолчанию - автоматическая фиксация смещения каждые 5 секунд. Я предполагаю, что вы убиваете своего потребителя до того, как он совершит свое смещение, поэтому он перезапускается с самого начала.

См. enable.auto.commit и auto.commit.interval.ms Конфигурация потребителя Kafka.

Вы также можете зафиксировать смещения вручную.

См. Также auto.offset.reset, чтобы определить, что должен делать ваш потребитель, когда у него нет начального смещения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...