Весеннее очищение от кафки - PullRequest
0 голосов
/ 23 сентября 2018

Я просто хочу понять, как перефразировать кафку. Это мой метод слушателя.Я настроил RetryTemplate с фабрикой потребителя для повторной попытки 20 раз с задержкой отката 20 секунд.Я использую spring-kafka 1.2.2 (мы планируем обновить клиент) и использую ручные подтверждения.

@KafkaListener(id = "${kafka.listener-id}", topics = "${kafka.topic}")
public void listen(final ConsumerRecord<String, String> consumerRecord,
                   final Acknowledgment acknowledgment) throws ServiceResponseException {}

    if (true){
        System.out.println("throwing exception ");
        throw new RuntimeException();
    }

    try {
        acknowledgment.acknowledge();
        LOGGER.info("Kafka acknowledgment sent for Transaction ID:");
    } catch (Exception e) {
        LOGGER.info("Exception encountered when acking record with transaction id: {}");
    } 
}

У меня есть 2 рабочих с параллелизмом 2 каждый.На кафке у меня 3 раздела.Я запустил один рабочий, и 3 раздела назначены работнику1.И тогда я отправил сообщение.И RuntimeException бросается в слушателя, и это происходит 20 раз с задержкой в ​​20 секунд.Потом, когда я запустил раб2 рабби, карабка запускает, но разделы еще не назначены.worker1 завершается ошибкой с сообщением «Ошибка при обработке: ConsumerRecord» (после getContainerProperties (). getShutdownTimeout ()), а затем все потребители присоединяются к группе.И теперь то же сообщение доставлено работнику 2.

1) И это работает так, как мне нужно, чтобы оно работало.Но у меня есть вопрос, когда перебалансирование вызывает, почему назначение раздела не происходит немедленно, вместо этого он ожидает полной остановки worker1 (в ожидании getContainerProperties (). GetShutdownTimeout ()), а затем все потребители из worker1 и worker2 присоединяются к группам.

2) И во время ребалансировки я заметил, что потребители перестают звонить на опрос (из журналов ниже).Это правда?

Журналы трассировки от работника 1:

2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-2-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.259 TRACE 6384 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : No records to process
2018-09-23 13:52:53.384 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.384 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:53.977 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:53.977 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.008 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:52:54.008 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:52:54.023  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-0] for group mris-group
2018-09-23 13:52:54.023 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.023 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.081  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-1] for group mris-group
2018-09-23 13:52:54.081 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.081 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.241  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [messages-2] for group mris-group
2018-09-23 13:52:54.241 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received partition revocation notification, and will stop the invoker.
2018-09-23 13:52:54.241 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Stopping invoker
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-2]
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-0]
2018-09-23 13:52:54.264  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:52:54.265  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:53:09.355 DEBUG 6384 --- [ listener-1-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Interrupting invoker
2018-09-23 13:53:24.083 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker stopped
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Invoker timed out while waiting for shutdown and will be canceled.
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[messages-1]
2018-09-23 13:53:24.085  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group mris-group
2018-09-23 13:53:24.101 ERROR 6384 --- [ listener-1-L-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = test_hotfix1@test.com, value = <removed>])

org.springframework.retry.backoff.BackOffInterruptedException: Thread interrupted while sleeping; nested exception is java.lang.InterruptedException: sleep interrupted
    at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:86) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.backoff.StatelessBackOffPolicy.backOff(StatelessBackOffPolicy.java:36) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:305) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:73) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:39) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:55) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.FilteringAcknowledgingMessageListenerAdapter.onMessage(FilteringAcknowledgingMessageListenerAdapter.java:34) ~[spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2200(KafkaMessageListenerContainer.java:245) [spring-kafka-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.2.2.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method) [na:1.8.0_162]
    at org.springframework.retry.backoff.ThreadWaitSleeper.sleep(ThreadWaitSleeper.java:30) ~[spring-retry-1.2.0.RELEASE.jar:na]
    at org.springframework.retry.backoff.FixedBackOffPolicy.doBackOff(FixedBackOffPolicy.java:83) ~[spring-retry-1.2.0.RELEASE.jar:na]
    ... 14 common frames omitted

2018-09-23 13:53:24.101  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.101  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-0] for group mris-group
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-2] for group mris-group
2018-09-23 13:53:24.102  INFO 6384 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.103  INFO 6384 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.103 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-0=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.104 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-2=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.106  INFO 6384 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-0]
2018-09-23 13:53:24.107 DEBUG 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.107 TRACE 6384 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.108  INFO 6384 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-2]
2018-09-23 13:53:24.108 DEBUG 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.108 TRACE 6384 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.207 DEBUG 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.207 TRACE 6384 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:25.111 TRACE 6384 --- [ listener-0-L-2] essageListenerContainer$ListenerConsumer : No records to process

Журналы трассировки от работника2:

2018-09-23 13:53:24.102  INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.104  INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group mris-group with generation 10
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [] for group mris-group
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Committing: {}
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.105  INFO 6401 --- [ listener-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2018-09-23 13:53:24.106  INFO 6401 --- [ listener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [messages-1] for group mris-group
2018-09-23 13:53:24.111 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {messages-1=OffsetAndMetadata{offset=0, metadata=''}}
2018-09-23 13:53:24.115  INFO 6401 --- [ listener-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[messages-1]
2018-09-23 13:53:24.118 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.118 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.189 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 1 records
2018-09-23 13:53:24.189 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.202 TRACE 6401 --- [ listener-0-L-1] essageListenerContainer$ListenerConsumer : Processing ConsumerRecord(topic = messages, partition = 1, offset = 0, CreateTime = 1537725149052, checksum = 3567644394, serialized key size = 27, serialized value size = 1952, key = test_hotfix1@test.com, value = <removed>)
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 DEBUG 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:24.209 TRACE 6401 --- [ listener-1-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.210 TRACE 6401 --- [ listener-2-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...
2018-09-23 13:53:24.216 DEBUG 6401 --- [ listener-0-L-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=<removed>]
throwing exception 
2018-09-23 13:53:25.194 DEBUG 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-23 13:53:25.194 TRACE 6401 --- [ listener-0-C-1] essageListenerContainer$ListenerConsumer : Polling (paused=false)...

1 Ответ

0 голосов
/ 24 сентября 2018

Версии до 1.3 имели очень сложную модель потоков, чтобы избежать перебалансировки из-за медленного слушателя.KIP-62 позволил нам использовать гораздо более простую модель потоков в версии 1.3 и выше.

1.2.x больше не поддерживается, и у меня нет времени (или желания) вернуться, чтобы выяснить, что произошло.Пожалуйста, обновитесь до 1.3.7 (или даже лучше, 2.1.10).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...