Мне нужно создать BroadcastStream, чтобы иметь возможность изменять свойства в базе данных и видеть применение этих свойств в реальном времени в приложении. У меня проблемы 2:
1) Когда я читаю базу данных, мне нужно иметь все строки одновременно, через resultSet, HashMap или что-нибудь, что может содержать структуру типа ключ-значение, поскольку некоторые свойства зависят от других свойств, поэтому я не могу обрабатывать их по отдельности. Структура моего MapStateDescriptor будет такой:
//String = topic name
//TopicProperties = object containing all the topic properties
MapStateDescriptor<String, TopicProperties> propertiesStateDescriptor = new MapStateDescriptor<String, TopicProperties>("properties",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.of(new TypeHint<TopicProperties>() {}));
BroadcastStream<Row> propertiesBroadcastStream = env.createInput(JDBCInputFormat)
.map(new TopicPropertiesDbMapper()
.broadcast(propertiesStateDescriptor);
TopicPropertiesDbMapper преобразует то, что JDBCInputFormat возвращает в структуру String, TopicProperties. Проблема в том, что он обрабатывается по одной строке за раз, но мне нужно обрабатывать их все вместе, как упомянуто выше.
2) Повторить чтение свойств и обновлять BroadcastStream один раз в час.
Я уточняю, что я уже сделал версию вышеупомянутой, но с чтением свойств из файла через:
readFile (FileInputFormat, path file, FileProcessingMode, milliseconds of interval for re-reading)
она работает, и я решил два проблемы, перечисленные выше для случая базы данных с:
1) Установите флаг unsplittable класса FileInputFormat в значение true;
2) FileProcessingMode.PROCESS_CONTINUOUSLY.