Гарантия заказа события в FlinkKafkaConsumer - PullRequest
1 голос
/ 24 октября 2019

TL; DR: Какое на данный момент лучшее решение для гарантии порядка событий во времени во Flink?

Я использую Flink 1.8.0 с Kafka 2.2.1. Мне нужно гарантировать правильный порядок событий по метке времени события. Я генерирую периодические водяные знаки каждые 1 с. Я использую FlinkKafkaConsumer с AscendingTimestampExtractor:

val rawConsumer = new FlinkKafkaConsumer[T](topicName, deserializationSchema, kafkaConsumerConfig)
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[T] {
      override def extractAscendingTimestamp(element: T): Long =
        timestampExtractor(element)
      })
 .addSource(consumer)(deserializationSchema.getProducedType).uid(sourceId).name(sourceId)

и затем обрабатываю:

myStream
   .keyBy(ev => (ev.name, ev.group))
   .mapWithState[ResultEvent, ResultEvent](DefaultCalculator.calculateResultEventState)

Я понял, что для неупорядоченных событий, которые произошли в той же или нескольких мсек позже, порядокFlink не исправляется. То, что я нашел в документации:

водяной знак запускает вычисление всех окон, где максимальная временная метка (которая является конечной временной меткой - 1) меньше, чем новый водяной знак

https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#interaction-of-watermarks-and-windows

Итак, я подготовил дополнительный шаг обработки, чтобы гарантировать порядок событий:

myStream
      .timeWindowAll(Time.milliseconds(100))
      .apply((window, input, out: Collector[MyEvent]) => input
        .toList.sortBy(_.getTimestamp)
        .foreach(out.collect) // this windowing guarantee correct order by event time
      )(TypeInformation.of(classOf[MyEvent]))
      .keyBy(ev => (ev.name, ev.group))
      .mapWithState[ResultEvent, ResultEvent](DefaultScoring.calculateResultEventState)

Однако я считаю это решение уродливым, и оно выглядит как обходной путь. Я также обеспокоен водяными знаками на раздел KafkaSource

В идеале я хотел бы поместить гарантию порядка в KafkaSource и сохранить ее для каждого раздела kafka, например водяные знаки на разделы. Возможно ли это сделать? Какое на данный момент лучшее решение для гарантии порядка событий во времени во Флинке?

Ответы [ 2 ]

3 голосов
/ 25 октября 2019

Flink не гарантирует обработку записей в порядке событий. Записи внутри раздела будут обрабатываться в их первоначальном порядке, но когда два или более разделов объединяются в новый раздел (из-за перераспределения или объединения потоков), Flink случайным образом объединяет записи этих разделов в новый раздел. Все остальное будет неэффективным и приведет к более высоким задержкам.

Например, если у вашей работы есть исходная задача, которая читает из двух разделов Kafka, записи обоих разделов объединяются в несколько случайный зигзагообразный шаблон.

Однако Flink гарантирует, что все события правильно обрабатываются в отношении сгенерированных водяных знаков. Это означает, что водяной знак никогда не опережает рекорд. Например, если ваш источник Kafka генерирует водяные знаки на разделы, водяные знаки остаются в силе даже после объединения записей нескольких разделов. Водяной знак используется для сбора и обработки всех записей, отметка времени которых меньше водяного знака. Следовательно, он обеспечивает полноту входных данных.

Это обязательное условие для упорядочения записей по их отметке времени. Вы можете сделать это с падающим окном все. Тем не менее, вы должны знать, что

  1. окно все будет выполняться в одной задаче (то есть, оно не распараллелено). Если порядка для каждого ключа достаточно, вы должны использовать обычное переворачивающееся окно или даже лучше реализовать KeyedProcessFunction, что более эффективно.
  2. порядок будет уничтожен при реорганизации потока из-за перераспределения или измененияпараллелизм.
1 голос
/ 24 октября 2019

Это замечательный момент. Наличие гарантии заказа в KafkaSource фактически состоит из двух частей.

  1. Гарантия порядка между разделами в одной подзадаче.
  2. Гарантия порядка среди подзадач.

Первая часть уже выполняется в https://issues.apache.org/jira/browse/FLINK-12675. А вторая часть нуждается в поддержке разделения состояния между подзадачами, что может потребовать дополнительного обсуждения и подробного плана в сообществе.

Назадна ваш вопрос, я думаю, что поддержание порядка событий путем установки окна для буферизации данных является лучшим решением в настоящее время.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...