Я создавал пример Kafka Producer, используя Java. я был
отправив обычные данные, которые просто "Test" + Integer в качестве значения для Kafka. я
использовали следующие свойства и после того, как я начал продюсера
Клиент и сообщения в пути, во время этого я убиваю брокера
и неожиданно получает сообщение об ошибке ниже, а не повторяет попытку.
Использование 3-х брокеров и темы с 3-мя разделами и коэффициентом репликации как 3
и без min-insync-реплик
Ниже приведены настроенные свойства config.put (ProducerConfig.ACKS_CONFIG, "all");
config.put (ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");
config.put (CommonClientConfigs.RETRIES_CONFIG, 60);
config.put (ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put (ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 10000);
config.put (ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
config.put (ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
config.put (ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1048576);
config.put (ProducerConfig.BATCH_SIZE_CONFIG, 16384);
config.put (ProducerConfig.LINGER_MS_CONFIG, 0);
config.put (ProducerConfig.BUFFER_MEMORY_CONFIG, 1073741824); // 1 ГБ
и результат, когда я убил всех своих брокеров или иногда одного из
брокер, как показано ниже
**Error:**
WARN org.apache.kafka.clients.producer.internals.Sender - [Producer
clientId=producer-1] Got error produce response with correlation id 124
on topic-partition testing001-0, retrying (59 attempts left). Error:
NETWORK_EXCEPTION
27791 [kafka-producer-network-thread | producer-1] WARN
org.apache.kafka.clients.producer.internals.Sender - [Producer
clientId=producer-1] Received invalid metadata error in produce request
on partition testing001-0 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.. Going to request metadata update now
28748 [kafka-producer-network-thread | producer-1] ERROR
org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread
'kafka-producer-network-thread | producer-1':
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(Unknown Source)
at java.nio.ByteBuffer.allocate(Unknown Source)
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate
(MemoryPool.java:30)
at org.apache.kafka.common.network.NetworkReceive.readFrom
(NetworkReceive.java:112)
at org.apache.kafka.common.network.KafkaChannel.receive
(KafkaChannel.java:335)
at org.apache.kafka.common.network.KafkaChannel.read
(KafkaChannel.java:296)
at org.apache.kafka.common.network.Selector.attemptRead
(Selector.java:560)
at org.apache.kafka.common.network.Selector.pollSelectionKeys
(Selector.java:496)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.producer.internals.Sender.run
(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run
(Sender.java:163)
at java.lang.Thread.run(Unknown Source)