Streams часто воссоздает магазины - PullRequest
0 голосов
/ 01 марта 2019

В потоковом приложении я использую интерактивные запросы и хранилища состояний, чтобы масштабировать и быстрее получать данные из тем.Однако довольно часто я вижу предупреждения в журналах:

anomaly-timeline-3                    | 2019-03-01 08:43:58,177 INFO 
anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Reinitializing StandbyTask TaskId: 1_0
anomaly-timeline-3                    |         ProcessorTopology:
anomaly-timeline-3                    |                 KSTREAM-SOURCE-0000000012:
anomaly-timeline-3                    |                         topics:         [anomaly-timeline-two-minutes-error-score-repartition]
anomaly-timeline-3                    |                         children:       [KSTREAM-REDUCE-0000000009]
anomaly-timeline-3                    |                 KSTREAM-REDUCE-0000000009:
anomaly-timeline-3                    |                         states:         [two-minutes-error-score]
anomaly-timeline-3                    | Partitions [anomaly-timeline-two-minutes-error-score-repartition-0]
anomaly-timeline-3                    |  from changelogs [anomaly-timeline-two-minutes-error-score-changelog-0]
anomaly-timeline-3                    | 2019-03-01 08:43:58,474 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.clients.consumer.internals.Fetcher [Consumer clientId=anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2-restore-consumer, groupId=] Resetting offset for partition anomaly-timeline-two-minutes-error-score-changelog-0 to offset 14787709.
anomaly-timeline-3                    | 2019-03-01 08:48:57,991 WARN anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Updating StandbyTasks failed. Deleting StandbyTasks stores to recreate from scratch. org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {anomaly-timeline-one-hour-error-score-changelog-0=14818811}
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1099)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
anomaly-timeline-3                    |
anomaly-timeline-3                    | org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {anomaly-timeline-one-hour-error-score-changelog-0=14818811}
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1099)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
anomaly-timeline-3                    | 2019-03-01 08:48:57,995 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Reinitializing StandbyTask TaskId: 3_0
anomaly-timeline-3                    |         ProcessorTopology:
anomaly-timeline-3                    |                 KSTREAM-SOURCE-0000000022:
anomaly-timeline-3                    |                         topics:         [anomaly-timeline-one-hour-error-score-repartition]
anomaly-timeline-3                    |                         children:       [KSTREAM-REDUCE-0000000019]
anomaly-timeline-3                    |                 KSTREAM-REDUCE-0000000019:
anomaly-timeline-3                    |                         states:         [one-hour-error-score]
anomaly-timeline-3                    | Partitions [anomaly-timeline-one-hour-error-score-repartition-0]
anomaly-timeline-3                    |  from changelogs [anomaly-timeline-one-hour-error-score-changelog-0]
anomaly-timeline-3                    | 2019-03-01 08:48:58,303 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.clients.consumer.internals.Fetcher [Consumer clientId=anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2-restore-consumer, groupId=] Resetting offset for partition anomaly-timeline-one-hour-error-score-changelog-0 to offset 14818854.

Так что, похоже, по какой-то причине Kafka повторно инициализирует резервную задачу, а затем не может обновить ее.Это, вероятно, приводит к тому, что магазины воссоздаются с нуля, если я понимаю логирование.

Итак, мои вопросы:

  • Несмотря на то, что это предупреждения, кажется, что Кафка работает не так, как должно,Является ли это предположение правильным?
  • Почему происходит сбой этой StandbyTask?
  • Удаляет ли это мои фактические хранилища состояний журнала изменений?
  • Должен ли я и как настроить политику сброса для этого потоканить?
  • Почему сбрасывается смещение для этого журнала изменений?

1 Ответ

0 голосов
/ 23 марта 2019

Несмотря на то, что это предупреждения, кажется, что Кафка работает не так, как должно.Является ли это предположение правильным?

Да.

Почему происходит сбой этого StandbyTask?

Кажется, что StandbyTask извлечен из недопустимого смещения,Это на самом деле не дает сбоя.

Удаляет ли это мои действительные хранилища состояний журналов изменений?

В этом случае удаляется только локальное хранилище, тема журнала изменений не затрагивается,Локальное хранилище удалено, потому что оно не синхронизировано с темой журнала изменений.Это позволяет начать повторное создание хранилища с нуля.

Должен ли я и как настроить политику сброса для этого потока потока?

Нельзя настроить политику сбросадля восстановления потребителя.Если вышеперечисленное происходит, Kafka Streams удаляет локальное хранилище и seeksToBeginning() в разделе журнала изменений, чтобы воссоздать магазин с нуля.

Почему сбрасывается смещение для этого журнала изменений?

Может быть, StandbyTask отстает?

Вы можете попробовать включить регистрацию TRACE для org.apache.kafka.streams.processor.internals.ProcessorStateManager.Смещения StandyTasks отслеживаются в локальном файле контрольных точек, который записывается при фиксации.Смещения регистрируются при фиксации:

log.trace("Writing checkpoint: {}", this.checkpointableOffsets);

Это должно помочь выяснить, отстает ли StandbyTask.В этом случае вам может понадобиться больше потоков или больше экземпляров, чтобы избежать этого.

...