Flink, TaskManager не отвечает - PullRequest
       18

Flink, TaskManager не отвечает

1 голос
/ 14 марта 2019

Вот случай, у нас есть 3 темы кафки (у каждого по 50 разделов), у них разные сообщения, в то время как у всех этих сообщений есть поле 'username',

topic_1 --> Message01 {String username; ...}, about 50,000 messages per minute
topic_2 --> Message02 {String username; ...}, about 3,000,000 messages per minute
topic_3 --> Message03 {String username; ...}, about 70,000 messages per minute

И мы определили класс-оболочку,

MessageWrapper{
 List<Message01> list01;
 List<Message02> list02;
 List<Message03> list03;
}

У нас есть flatMap, который «конвертирует» исходное сообщение в tuple3,

String field --> username
Integer field --> type
MessageWrapper field --> the wrapper object

Все 3 потока обрабатываются аналогичной функцией flatMap (),

public void flatMap(Message01 value, Collector<Tuple3<String, Integer, MessageWrapper>> out)
        throws Exception {
    String name = value.getUsername();
    if (!StringUtils.isBlank(name)) {
        MessageWrapper wrapper = new MessageWrapper();
        List<Message01> list = new ArrayList<>();
        list.add(value);
        wrapper.setList01(list);
        out.collect(new Tuple3<>(name, 1, wrapper));
    }
}

После flatMap () мы объединяем эти 3 потока,

stream1.union(stream2, stream3).keyBy(0).timeWindow(Time.seconds(300))
        .process(
                new ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>() {

                    @Override
                    public void process(Tuple key,
                            ProcessWindowFunction<Tuple3<String, Integer, MessageWrapper>, MessageWrapper, Tuple, TimeWindow>.Context ctx,
                            Iterable<Tuple3<String, Integer, MessageWrapper>> elements,
                            Collector<MessageWrapper> out) throws Exception {
                        // merge all entities which have same username, to get a big fat wrapper object
                        MessageWrapper w = new MessageWrapper();
                        for (Tuple3<String, Integer, MessageWrapper> t3 : elements) {
                            MessageWrapper ret = t3.f2;
                            Integer type = t3.f1;
                            if (type == 1) {
                                // add to list01
                            } else if (type == 2) {
                                // add to list02
                            } else if (type == 3) {
                                // add to list03
                            }
                        }

                        if (all 3 lists are not empty) {
                            out.collect(ret);
                        }
                    }
                });

В настоящее время мы используем 20 TaskManager, каждое из 4 ядер + 16G, всего 80 слотов, мы используем 50 параллелизма через.

Мы всегда сталкиваемся с проблемой, что менеджер задач не отвечает из-за слишком большого полного gc,

Connecting to remote task manager + 'xxxxxxxxxxxxx' has failed. This might indicate that the remote task manager has been lost".

Если мы сократим временное окно с 5 минут до 1 минуты, все будет хорошо. В соответствии с этим, похоже, что кластеру flink не хватает ресурсов, но 80 ядер + 320 Гб для нескольких миллионов сообщений (размер каждого сообщения составляет около 5 КБ), должно быть достаточно, верно?

Кто-нибудь мог бы пролить свет здесь? Или, возможно, в коде есть проблемы?

1 Ответ

0 голосов
/ 03 апреля 2019

Я решил эту проблему при настройке своего кластера, прокомментировав строку с 127.0.1.1 в файле /etc/hosts всех машин. И я увеличил параллелизм слотов в свойстве taskmanager.numberOfTaskSlots: файла conf/flink-conf.yaml.

...