У меня есть две проблемы с Apache Kafka.
Выпуск 1
- Отправка 100 000 сообщений в Кафку
- Завершите работу служб zookeeper и kafka с помощью Ctrl-C перед потребителем.
потребляет все 100 000 сообщений (это было смоделировано с помощью
Thread.sleep(1000)
в методе потребления).
Discovery
Потребитель продолжал записывать сообщения на консоли после завершения работы служб zookeeper и kafka.
Expectation
Потребитель должен прекратить использование сообщений и перезапускаться с индекса последнего сообщения + 1 после того, как zookeeper и kafka были вызваны.
Вопрос
Как заставить потребителя продолжить работу с индекса + 1 последнего использованного сообщения.
Выпуск 2
- Отправить 100 000 сообщений в Кафку
- Завершите работу служб zookeeper и kafka с помощью Ctrl-C перед потребителем.
потребляет все 100 000 сообщений (это было смоделировано с помощью
Thread.sleep(1000)
в методе потребления).
- Убивает приложение весенней загрузки, потребляя сообщения
- Воспользуйтесь услугами зоопарка и кафки.
- Откройте приложение весенней загрузки, отвечающее за потребление сообщений.
Discovery
Потребитель принимает все сообщения от начала, игнорируя последнее использованное сообщение.
Expectation
Потребитель должен начать использовать индекс + 1 последнего использованного сообщения до того, как приложение весенней загрузки было закрыто.
Вопрос
Как заставить потребителя продолжить с индекса + 1 последнего использованного сообщения.
Фрагменты кода
KafkaConsumerConfig
@Configuration
@EnableKafka
public class KafkaConsumerConfig
{
@Bean
public Map<String, Object> consumerConfigs()
{
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "basic-group");
return props;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory()
{
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(100);
return factory;
}
}
KafkaProducerConfig
@Configuration
public class KafkaProducerConfig
{
@Bean
public ProducerFactory<Integer, String> producerFactory()
{
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs()
{
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
}
}
KafkaProducer
@Component
public class KafkaProducer
{
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message, final String topicName)
{
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>()
{
@Override
public void onSuccess(SendResult<String, String> result)
{
System.out.println("Sent message=[" + message
+ "] with offset=[" + result.getRecordMetadata().offset() + "]");
}
@Override
public void onFailure(Throwable ex)
{
System.out.println("Unable to send message=["
+ message + "] due to : " + ex.getMessage());
}
});
}
}
KafkaConsumer
@Component
public class KafkaConsumer
{
@KafkaListener(id = "basic", topics = "test-1", clientIdPrefix = "test-prefix-id", autoStartup = "true", concurrency = "3")
public void multipleTransactionNotification(@Payload final String message)
{
try
{
Thread.sleep(1000);
System.out.println(message);
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
TestApplication
@SpringBootApplication
public class TestApplication
{
public static void main(String[] args)
{
SpringApplication.run(TestApplication.class, args);
}
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx, KafkaProducer producer, NIPKafkaConsumer consumer) {
return args -> {
for (int i = 0; i < 100_000; i++)
{
producer.sendMessage("New Message-" + i, "test-1");
}
};
}
}