Для одного из наших вариантов использования нам нужно повторить некоторые вычисления на основе изменений в файле, а затем передать результат этого файла, чтобы мы могли использовать его в другом потоке.
Жизненный цикл программы выглядит примерно так:
Datastream 1: Отслеживаемый файл -> обнаружение некоторых изменений -> повторная обработка всех элементов в файле -> вычисление одного результата -> широковещание
Datastream 2: Некоторая трансформация -> сделать что-то для каждого элемента в DS2, используя все существующие элементы широковещательной передачи (некоторая потеря данных допустима в вещательных элементах некоторое время)
Я приведу несколько примеров кода, чтобы лучше объяснить, в чем проблема:
Итак, это DS1:
Сопоставление каждого элемента, отправка их в редуктор, а затем вычисление общей суммы
env.readFile(format, clientPath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval)
.map(new Adder())
.keyBy(Map::size)
.reduce(new Reducer());
Это фаза отображения, она просто создает хеш-карту из строки
public static class Adder extends RichMapFunction<String, Map<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public Map<String, String> map(String string) throws Exception {
String[] strings = string.split("=");
HashMap<String, String> hashMap = new HashMap<>();
hashMap.put(strings[0], strings[1]);
return hashMap;
}
}
Это последний шаг, редуктор. Принимает все уменьшенные элементы, поступающие из картографов, а затем возвращает итоговое значение, одно хэш-карту
public static class Reducer extends RichReduceFunction<Map<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {
stringStringMap.putAll(t1);
return stringStringMap;
}
}
А затем DS1 транслируется как следующий фрагмент кода.
MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("Brodcasted map state", Types.STRING, Types.STRING);
BroadcastStream<Map<String, String>> broadcastedProperties = clientProperties.broadcast(descriptor);
ds2.connect(broadcastedProperties).process(new EventListener(properties));
Использование следующих элементов в заданное время
Time Document
T1 K1=V1, K2=V2
T2 K2=V2
T3 K3=V3, K1=V4
Когда я запускаю нашу программу, я ожидаю, что это:
Time Broadcasted Elements
T1 K1=V1, K2=V2
T2 K2=V2
T3 K3=V3, K1=V4
Я вижу вот что:
Time Broadcasted Elements
T1 K1=V1, K2=V2
T2 K1=V1, K2=V2
T3 K1=V4, K2=V2, K3=V3
Что я сделал, чтобы преодолеть эту проблему, так это просто взять окно в потоке данных и использовать агрегатную функцию с аккумулятором вместо редуктора, но я бы предпочел использовать неоконный подход.
Я провел некоторую отладку, и я пришел к выводу, что, хотя на этапе отображения он отображает только доступные элементы за это время, на этапе сокращения он сокращается на основе предыдущего состояния (тем самым я средний результат времени - 1) + все элементы в этой точке. Я нахожу довольно странным иметь невидимое состояние в фазе редукции. С моей точки зрения, это должно основываться только на тех элементах, которые прямо исходят от картографов. Может быть, мое понимание сокращения в Flink неверно, но я бы хотел получить некоторые разъяснения по этому поводу.