Flink - Создать BroadcastStream из свойств базы данных - PullRequest
1 голос
/ 25 февраля 2020

Мне нужно создать BroadcastStream, чтобы иметь возможность изменять свойства в базе данных и видеть применение этих свойств в реальном времени в приложении. У меня проблемы 2:

1) Когда я читаю базу данных, мне нужно иметь все строки одновременно, через resultSet, HashMap или что-нибудь, что может содержать структуру типа ключ-значение, поскольку некоторые свойства зависят от других свойств, поэтому я не могу обрабатывать их по отдельности. Структура моего MapStateDescriptor будет такой:

//String = topic name
//TopicProperties = object containing all the topic properties
MapStateDescriptor<String, TopicProperties> propertiesStateDescriptor = new MapStateDescriptor<String, TopicProperties>("properties", 
                                                                                                                        BasicTypeInfo.STRING_TYPE_INFO,
                                                                                                                        BasicTypeInfo.of(new TypeHint<TopicProperties>() {}));

BroadcastStream<Row> propertiesBroadcastStream = env.createInput(JDBCInputFormat)
                                                    .map(new TopicPropertiesDbMapper()
                                                    .broadcast(propertiesStateDescriptor);

TopicPropertiesDbMapper преобразует то, что JDBCInputFormat возвращает в структуру String, TopicProperties. Проблема в том, что он обрабатывается по одной строке за раз, но мне нужно обрабатывать их все вместе, как упомянуто выше.

2) Повторить чтение свойств и обновлять BroadcastStream один раз в час.


Я уточняю, что я уже сделал версию вышеупомянутой, но с чтением свойств из файла через:

readFile (FileInputFormat, path file, FileProcessingMode, milliseconds of interval for re-reading)

она работает, и я решил два проблемы, перечисленные выше для случая базы данных с:

1) Установите флаг unsplittable класса FileInputFormat в значение true;

2) FileProcessingMode.PROCESS_CONTINUOUSLY.

...