мы выполняем задание, для которого ListState составляет от 300 до 400 ГБ, а иногда список может увеличиваться до нескольких тысяч. В нашем случае каждый элемент должен иметь свой собственный TTL, поэтому мы создаем новый таймер для каждого нового элемента этого ListState с серверной частью RocksDB на S3.
В настоящее время это около 140+ миллионов таймеров ( который сработает при event.timestamp + 40days ).
Наша проблема в том, что внезапно контрольная точка задания застревает или ОЧЕНЬ медленно (например, 1% за несколько часов), пока в конечном итоге таймауты. Обычно он останавливается (на панели управления миганием отображается 0/12 (0%)
, а в предыдущих строках отображается 12/12 (100%)
) на фрагменте кода, который довольно прост:
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
Дополнительная информация:
- Режим контрольной точки AT_LEAST_ONCE, кажется, застрять легче, чем EXACTLY_ONCE
- Несколько месяцев go состояние увеличилось до 1,5 ТБ данных, и я думаю, миллиарды таймеров без проблем.
- Оперативная память, ЦП и сеть на машинах, на которых запущены оба диспетчера задач, выглядят нормально
state.backend.rocksdb.thread.num = 4
- Первый инцидент произошел прямо тогда, когда мы получили поток событий (около миллионов минут), но не на предыдущий.
- Все события происходят из тем Kafka.
- В режиме контрольной точки AT_LEAST_ONCE задание по-прежнему выполняется и потребляет нормально.
Это второе раз уж с нами случается, что топология работает очень хорошо с несколькими миллионами событий в день и внезапно перестает устанавливать контрольные точки. Мы понятия не имеем, что могло вызвать это.
Кто-нибудь может подумать, что могло внезапно привести к зависанию контрольной точки?