Может быть, я просто скучаю по чему-то, но у меня просто нет больше идей, где искать.
Я читаю сообщения из 2 источников, делаю соединение на основе общего ключа и передаю все это Кафке.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
...
source1
.keyBy(_.searchId)
.connect(source2.keyBy(_.searchId))
.process(new SearchResultsJoinFunction)
.addSink(KafkaSink.sink)
, поэтому он отлично работает, когда я запускаю его локально, и также работает на кластере с Parallelism, установленным в 1, но с 3 больше нет.
Когда я разверну его на 1 диспетчере заданий и 3 менеджерах задач и переведу каждую задачу в состояние «РАБОТА», через 2 минуты (когда ничего не происходит, один из менеджеров задач) получает следующий журнал:
https://gist.github.com/zavalit/1b1bf6621bed2a3848a05c1ef84c689c#file-gistfile1-txt-L108
и все это просто отключается.
Буду признателен за любую подсказку.tnx, заранее.