Kafka Consumer Group Rebalance и координатор группы умерли - PullRequest
0 голосов
/ 25 мая 2018

Пару месяцев я играл с Кафкой (1.0.0) и пытался понять, как работает группа потребителей.У меня есть один брокер Kafka, и я использую Kafka-Connect-Cassandra для приема сообщений из тем в таблицы базы данных.У меня есть 10 тем, у всех есть только один раздел, и у меня есть единая группа потребителей с 10 экземплярами потребителей (по одному для каждой темы).

Во время этой настройки я иногда вижу следующие журналы в консоли kafka-connect:

1:

[Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)


[Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordi
nator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Revoking previously assigned partitions [topic1-0, topic2-0, ....] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Successfully joined group with generation 349 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Setting newly assigned partitions [topic1-0, topic2-0, ....] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)

После этого он начинает потреблять сообщения и записывать в таблицы Кассандры.Это происходит часто с нерегулярными интервалами.

Однако иногда разъем останавливается и отключается.Затем он запускается и снова потребляет сообщения.Это журнал:

INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)

INFO WorkerSinkTask{id=cassandra-sink-casb-0} Committing offsets asynchronously using sequence number 42: {topic1-0=OffsetAndMetadata{offset=1074, metadata=''}, topic2-0=OffsetAndMetadata{offset=112, metadata=''}, ...}} (org.apache.kafka.connect.runtime.WorkerSinkTask:311)
INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1214)
INFO Stopping connector cassandra-sink-casb (org.apache.kafka.connect.runtime.Worker:304)
INFO Stopping task cassandra-sink-casb-0 (org.apache.kafka.connect.runtime.Worker:464)
INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79)
INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:253)
INFO Stopped connector cassandra-sink-casb (org.apache.kafka.connect.runtime.Worker:320)
INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1244)
INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation 7 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-1dc56cda-ed54-4181-a5f9-d11022d8e8c3', leaderUrl='http://127.0.1.1:8083/', offset=8, connectorIds=[cassandra-sink-casb], taskIds
=[cassandra-sink-casb-0]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1192)
INFO Starting connectors and tasks using config offset 8 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:837)
INFO Starting connector cassandra-sink-casb (org.apache.kafka.connect.runtime.distributed.DistributedHerder:890)

2:

org.apache.kafka.clients.consumer.CommitFailedException: 
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum
 size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:299)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:327)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:398)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:547)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:62)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:618)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:419)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:410)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:283)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordi
nator:341)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordi
nator:341)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Successfully joined group with generation 343 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Setting newly assigned partitions [topic1-0, topic2-0,...] (org.apache.kafka.cl
ients.consumer.internals.ConsumerCoordinator:341)
INFO WorkerSinkTask{id=cassandra-sink-casb-0} Committing offsets asynchronously using sequence number 155: {topic1-0=OffsetAndMetadata{offset=836, metadata=''}, topic2-0=OffsetAndMetadata{offset=86, metadata=''}, ...}} (org.apache.kafka.connect.runtime.WorkerSinkTask:311)

Снова иногда Kafka-Connect начинает потреблять сообщения после перебалансировки, а иногда отключается.

У меня естьследующие вопросы:

1) Why does Group Coordinator (Kafka Broker) dies?

Я ищу несколько Kafka-Config для решения этих проблем, таких как connections.max.idle.ms, max.poll.records, session.timeout.ms, group.min.session.timeout.ms и group.max.session.timeout.

Я не уверен, какие будут лучшие конфигурации для того, чтобы все работало гладко.

2) Why does rebalance occurs?

Я знаю, что при добавлении новой задачи, изменении задачи и т. Д. Может произойти перебалансировка группы. Но у меня нетЯ ничего не изменил.Иногда Kafka Connect Framework, кажется, обрабатывает ошибку слишком агрессивно и убивает задачи подключения вместо продолжения работы.

...