Почему контрольные точки так сильно влияют на задержку? - PullRequest
0 голосов
/ 27 февраля 2019

Я наблюдаю, что наличие контрольных точек при использовании бэкэнда памяти вызывает неожиданное увеличение наблюдаемых задержек.

Рассмотрим следующую контрольную точку:

2019-02-27 15:35:46,322 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering checkpoint 2 @ 1551281746322 for job a80597b3312f0704beed75397c371bf5.
2019-02-27 15:35:46,326 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap backend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[KeyedProcess -> Map -> Sink: Unnamed (1/1),5,Flink Task Threads] took 0 ms.
2019-02-27 15:35:46,342 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend    - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, synchronous part) in thread Thread[Async calls on Source: Custom Source -> Map -> Timestamps/Watermarks (1/1),5,Flink Task Threads] took 2 ms.
2019-02-27 15:35:46,346 INFO  org.apache.flink.runtime.state.DefaultOperatorStateBackend    - DefaultOperatorStateBackend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-14-thread-2,5,Flink Task Threads] took 3 ms.
2019-02-27 15:35:46,351 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend     - Heap backend snapshot (In-Memory Stream Factory, asynchronous part) in thread Thread[pool-11-thread-2,5,Flink Task Threads] took 14 ms.
2019-02-27 15:35:46,378 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed checkpoint 2 for job a80597b3312f0704beed75397c371bf5 (1157653 bytes in 54 ms).

Даже если конецдлительность до конца составляла всего 50 мсек, а ответ на событие, введенное в 15:35:46,385, пришел только к 15:35:46,905 ( 520 мс позже ).Между этими 2 временными метками события не обрабатывались.Без контрольной точки задержка в 99,99% составляет ~ 15 мс.

Настройка:

  • Параллельность = 1
  • Сетевой буфер = 0
  • Источник RMQ -> Окно -> приемник RMQ
  • Инжектор измеряет задержку, используя разницу System.nanoTime между введением и получением ответа

edit: это линейное задание, поэтому я думаю, что выравнивания нетбарьеров контрольно-пропускного пункта.

1 Ответ

0 голосов
/ 01 марта 2019

Время тратится на синхронный ACK'инг сообщений в RabbitMQ (MessageAcknowledgingSourceBase#notifyCheckpointComplete> MultipleIdsMessageAcknowledgingSourceBase#acknowledgeIDs> RMQSource#acknowledgeSessionIDs).Вероятно, это можно сделать асинхронно, как это делает коннектор Kafka.

Поскольку интервал между контрольными точками составляет 3 минуты, а я ввожу 200 ev / s, это означает, что каждая контрольная точка вызывает подтверждение 36k сообщений (200 *60 * 3), что занимает около 500 мс.

Использование меньшего интервала может помочь получить более предсказуемую задержку за счет более высокой средней задержки.

...