Я использую функцию IntervalJoin для объединения двух потоков в течение 10 минут.Как показано ниже:
labelStream.intervalJoin(adLogStream)
.between(Time.milliseconds(0), Time.milliseconds(600000))
.process(new processFunction())
.sink(kafkaProducer)
labelStream и adLogStream являются классом proto-buf, для которых используется Long id.
Наши два входных потока огромны.Примерно после 30 минут работы вывод kafka медленно снижается, например:
Когда вывод данных начинает снижаться, я использую jstack и pstack sevaral времена, чтобы получить эти:
Кажется, программа застряла в поиске rockdb.И я нахожу, что к некоторому файлу srt в rockdb обращаются медленно итерацией.
Я пробовал несколько способов:
1)Reduce the input amount to half. This works well.
2)Replace labelStream and adLogStream with simple Strings. This way, data amount will not change. This works well.
3)Use PredefinedOptions like SPINNING_DISK_OPTIMIZED and SPINNING_DISK_OPTIMIZED_HIGH_MEM. This still fails.
4)Use new versions of rocksdbjni. This still fails.
Может кто-нибудь дать мне несколько советов?Большое спасибо.