После https://stackoverflow.com/a/36009859/9911256, может показаться, что фиксация / автокоммитирование Kafka может завершиться неудачей, если потребитель внезапно умрет. На самом деле, мое приложение Kafka отлично работает на производстве, но во время тестов ИНОГДА у меня возникает эта повторяющаяся проблема (до перезагрузки kafka): смещение одинаковое.
Мой модульный тест (один производитель java отправляет 10 пакетов одному потребителю java, одному брокеру, одному разделу, одному разделу, одной группе) запускает 10 пакетов, и я проверяю их, начиная с первого:
SENT: (0) NAME:person-001; UUID:352c1f8e-c141-4446-8ac7-18eb044a6b92
SENT: (1) NAME:person-001; UUID:81681a30-83e1-4f85-b07f-da140cfdb874
SENT: (2) NAME:person-001; UUID:3b9db497-460a-4a1c-86b9-f724af1a0449
SENT: (3) NAME:person-001; UUID:63c0edf9-ec00-4ef7-b81a-4b1b8919a42d
SENT: (4) NAME:person-001; UUID:346f265c-1964-4460-97de-1a7b43285c06
SENT: (5) NAME:person-001; UUID:2d1bb49c-03ce-4762-abb3-2bbb963e87d1
SENT: (6) NAME:person-001; UUID:3c8ddda0-6cb8-45b4-b1d2-3a99ba57a48a
SENT: (7) NAME:person-001; UUID:3f819408-41d5-4cad-ad39-322616a86b99
SENT: (8) NAME:person-001; UUID:1db09bc1-4c90-4a0d-8efc-d6ea8a791985
SENT: (9) NAME:person-001; UUID:705a3a3c-fd15-45a9-a96c-556350f1f79a
Exception in thread "Thread-2" org.opentest4j.AssertionFailedError: expected: <352c1f8e-c141-4446-8ac7-18eb044a6b92> but was: <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>
И если я запусту тест снова:
SENT: (0) NAME:person-001; UUID:d171e7ee-fa73-4cb4-826e-f7bffdef9e92
SENT: (1) NAME:person-001; UUID:25da6b6e-57e9-4f8a-a3ff-1099f94fcaf5
SENT: (2) NAME:person-001; UUID:d05b4693-ba60-4db2-a5ae-30dcd44ce5b7
SENT: (3) NAME:person-001; UUID:fbd75ee7-6f34-4ab1-abda-d31ee91d0ff8
SENT: (4) NAME:person-001; UUID:798fe246-f10e-4fc3-90c9-df3e181bb641
SENT: (5) NAME:person-001; UUID:26b33a19-7e65-49ec-b54d-3379ef76b797
SENT: (6) NAME:person-001; UUID:45ecef46-69f5-4bff-99b5-c7c2dce67ec8
SENT: (7) NAME:person-001; UUID:464df926-cd66-4cfa-b282-36047522dfe8
SENT: (8) NAME:person-001; UUID:982c82c0-c669-400c-a70f-62c57e3552a4
SENT: (9) NAME:person-001; UUID:ecdbfce6-d378-496d-9e0b-30f16b7cf484
Exception in thread "Thread-2" org.opentest4j.AssertionFailedError: expected: <d171e7ee-fa73-4cb4-826e-f7bffdef9e92> but was: <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>
- Обратите внимание, что сообщение <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12> неоднократно отправляется при каждой попытке, несмотря на то, что это разные линии, которые я делаю вручную.
Я использую:
properties.put ("auto.offset.reset", "latest");
Я уже попробовал вариант autocommit
, с эквивалентными результатами.
- Хуже всего, что это происходит (или нет) каждый раз, когда я перезагружаю сервер kafka:
- иногда я перезагружаю его, и тесты могут пройти нормально, даже если я повторяю любое количество раз.
- иногда кажется, что он входит в это состояние сбоя и ВСЕГДА терпит неудачу.
- Как уже говорилось, когда потребители не умирают, а массовые потоки сообщений управляются, эта проблема не появляется.
- Я также заметил, что если сервер был недавно перезагружен и все журналы и каталоги данных kafka были удалены, первые тесты могут задержаться и завершиться неудачей.
Мои журналы показывают это:
2019-01-25T12:20:02.874119+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,850] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 26 (__consumer_offsets-48) (reason: Adding new member consumer-1-a0b94a2a-0cae-4ba8-85f0-9a84030f4beb) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:02.874566+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,851] INFO [GroupCoordinator 1001]: Stabilized group 0 generation 27 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:02.874810+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,858] INFO [GroupCoordinator 1001]: Assignment received from leader for group 0 for generation 27 (kafka.coordinator.group.GroupCoordinator)
13 секунд после теста:
2019-01-25T12:20:15.894185+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:15,871] INFO [GroupCoordinator 1001]: Member consumer-2-79f97c80-294c-438b-8a8a-3745f4a57010 in group 0 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:15.894522+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:15,871] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 27 (__consumer_offsets-48) (reason: removing member consumer-2-79f97c80-294c-438b-8a8a-3745f4a57010 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:17.897272+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:17,865] INFO [GroupCoordinator 1001]: Member consumer-1-a0b94a2a-0cae-4ba8-85f0-9a84030f4beb in group 0 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:17.897579+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:17,866] INFO [GroupCoordinator 1001]: Group 0 with generation 28 is now empty (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
Что я могу сделать вывод, прочитав предыдущую тему, так это то, что kafka будет работать нормально, если потребитель постоянно подключен (производство). Но это невозможно во время тестов! В чем здесь проблема ???
ОБНОВЛЕНИЕ: Я обнаружил, что CURRENT-OFFSET эффективно не меняется в некоторых случаях (что странно, потому что я продолжаю получать сообщение <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12> ), и, конечно, LOG-END-OFFSET растет, но это совершенно ошибочно ...
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t0001 0 160 1085 925 consumer-2-1a2cce59-c449-471e-bad0-3c3335f44e26 /10.42.0.105 consumer-2
после запуска теста, снова:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t0001 0 160 1153 993 consumer-2-dff28a54-b4e8-464a-a5e7-67c8cbad749f /10.42.0.105 consumer-2