Вот случай, у нас есть 3 темы кафки (у каждого по 50 разделов), у них разные сообщения, в то время как у всех этих сообщений есть поле 'username',
topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute
И мы определили класс-оболочку,
MessageWrapper{
List<Message01> list01;
List<Message02> list02;
List<Message03> list03;
}
У нас есть flatMap, который «конвертирует» исходное сообщение в tuple3,
String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object
Все 3 потока обрабатываются аналогичной функцией flatMap (),
public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
throws Exception {
String name = value.getUsername();
if (!StringUtils.isBlank(name)) {
MessageWrapper wrapper = new MessageWrapper();
List<Message01> list = new ArrayList<>();
list.add(value);
wrapper.setList01(list);
out.collect(new Tuple3<>(name, 1, wrapper));
}
}
После flatMap () мы объединяем эти 3 потока,
stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
.process(
new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {
@Override
public void process(Tuple key,
ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
Collector<MessageWrapper> out) throws Exception {
// merge all entities which have same username, to get a big fat wrapper object
MessageWrapper w = new MessageWrapper();
for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
MessageWrapper ret = t3.f2;
Integer type = t3.f1;
if (type == 1) {
// add to list01
} else if (type == 2) {
// add to list02
} else if (type == 3) {
// add to list03
}
}
if (all 3 lists are not empty) {
out.collect(ret);
}
}
});
В настоящее время мы используем 20 TaskManager, каждое из 4 ядер + 16G, всего 80 слотов, мы используем 50 параллелизма через.
Мы всегда сталкиваемся с проблемой, что менеджер задач не отвечает из-за слишком большого полного gc,
Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".
Если мы сократим временное окно с 5 минут до 1 минуты, все будет хорошо. В соответствии с этим, похоже, что кластеру flink не хватает ресурсов, но 80 ядер + 320 Гб для нескольких миллионов сообщений (размер каждого сообщения составляет около 5 КБ), должно быть достаточно, верно?
Кто-нибудь мог бы пролить свет здесь? Или, возможно, в коде есть проблемы?