Apache Флинк-контрольная точка зависла - PullRequest
2 голосов
/ 29 мая 2020

мы выполняем задание, для которого ListState составляет от 300 до 400 ГБ, а иногда список может увеличиваться до нескольких тысяч. В нашем случае каждый элемент должен иметь свой собственный TTL, поэтому мы создаем новый таймер для каждого нового элемента этого ListState с серверной частью RocksDB на S3.

В настоящее время это около 140+ миллионов таймеров ( который сработает при event.timestamp + 40days ).

Наша проблема в том, что внезапно контрольная точка задания застревает или ОЧЕНЬ медленно (например, 1% за несколько часов), пока в конечном итоге таймауты. Обычно он останавливается (на панели управления миганием отображается 0/12 (0%), а в предыдущих строках отображается 12/12 (100%)) на фрагменте кода, который довольно прост:

[...]
    val myStream = env.addSource(someKafkaConsumer)
      .rebalance
      .map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
      .uid("src_kafka_stream")
      .name("some_name")

      myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
        .getSideOutput(outputTag)
        .keyBy(_.name)
        .addSink(sink)
[...]

Дополнительная информация:

  • Режим контрольной точки AT_LEAST_ONCE, кажется, застрять легче, чем EXACTLY_ONCE
  • Несколько месяцев go состояние увеличилось до 1,5 ТБ данных, и я думаю, миллиарды таймеров без проблем.
  • Оперативная память, ЦП и сеть на машинах, на которых запущены оба диспетчера задач, выглядят нормально
  • state.backend.rocksdb.thread.num = 4
  • Первый инцидент произошел прямо тогда, когда мы получили поток событий (около миллионов минут), но не на предыдущий.
  • Все события происходят из тем Kafka.
  • В режиме контрольной точки AT_LEAST_ONCE задание по-прежнему выполняется и потребляет нормально.

Это второе раз уж с нами случается, что топология работает очень хорошо с несколькими миллионами событий в день и внезапно перестает устанавливать контрольные точки. Мы понятия не имеем, что могло вызвать это.

Кто-нибудь может подумать, что могло внезапно привести к зависанию контрольной точки?

1 Ответ

2 голосов
/ 29 мая 2020

Несколько мыслей:

Если у вас есть много таймеров, которые срабатывают более или менее одновременно, эта буря таймеров предотвратит что-либо еще - задачи будут oop вызывать onTimer, пока не будет больше не будут запускаться таймеры, в течение которого их входные очереди будут игнорироваться, а барьеры контрольных точек не будут развиваться.

Если это причина ваших проблем, вы можете добавить случайный джиттер к своим таймерам, чтобы эти штормы событий не превращаются позже в штормы таймера. Другим вариантом может быть реорганизация вещей для использования State TTL .

Если у вас много таймеров в куче, это может привести к очень высоким накладным расходам G C. Это не обязательно приведет к сбою, но может сделать контрольную точку нестабильной. В этом случае может помочь перемещение таймеров в RocksDB.

Также: поскольку вы используете RocksDB, переключение с ListState на MapState с указанием времени в качестве ключа позволит вам удалить отдельные записи без необходимости повторной сериализации всей список после каждого обновления. (В RocksDB каждая пара ключ / значение в MapState представляет собой отдельный объект RocksDB.) Повышение эффективности очистки таким образом может быть лучшим решением.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...