Уменьшить поведение функции в потоках с ключами - PullRequest
0 голосов
/ 03 апреля 2019

Для одного из наших вариантов использования нам нужно повторить некоторые вычисления на основе изменений в файле, а затем передать результат этого файла, чтобы мы могли использовать его в другом потоке.

Жизненный цикл программы выглядит примерно так:

Datastream 1: Отслеживаемый файл -> обнаружение некоторых изменений -> повторная обработка всех элементов в файле -> вычисление одного результата -> широковещание

Datastream 2: Некоторая трансформация -> сделать что-то для каждого элемента в DS2, используя все существующие элементы широковещательной передачи (некоторая потеря данных допустима в вещательных элементах некоторое время)

Я приведу несколько примеров кода, чтобы лучше объяснить, в чем проблема:

Итак, это DS1: Сопоставление каждого элемента, отправка их в редуктор, а затем вычисление общей суммы

env.readFile(format, clientPath, FileProcessingMode.PROCESS_CONTINUOUSLY, interval)
    .map(new Adder())
    .keyBy(Map::size)
    .reduce(new Reducer());

Это фаза отображения, она просто создает хеш-карту из строки

public static class Adder extends RichMapFunction<String, Map<String, String>> {
  private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> map(String string) throws Exception {
    String[] strings = string.split("=");
    HashMap<String, String> hashMap = new HashMap<>();
    hashMap.put(strings[0], strings[1]);
    return hashMap;
  }
}

Это последний шаг, редуктор. Принимает все уменьшенные элементы, поступающие из картографов, а затем возвращает итоговое значение, одно хэш-карту

public static class Reducer extends RichReduceFunction<Map<String, String>> {
  private static final long serialVersionUID = 1L;

  @Override
  public Map<String, String> reduce(Map<String, String> stringStringMap, Map<String, String> t1) throws Exception {
    stringStringMap.putAll(t1);
    return stringStringMap;
  }
}

А затем DS1 транслируется как следующий фрагмент кода.

MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>("Brodcasted map state", Types.STRING, Types.STRING);
BroadcastStream<Map<String, String>> broadcastedProperties =  clientProperties.broadcast(descriptor); 
ds2.connect(broadcastedProperties).process(new EventListener(properties));

Использование следующих элементов в заданное время

Time    Document
T1      K1=V1, K2=V2
T2      K2=V2
T3      K3=V3, K1=V4

Когда я запускаю нашу программу, я ожидаю, что это:

Time    Broadcasted Elements
T1      K1=V1, K2=V2
T2      K2=V2
T3      K3=V3, K1=V4

Я вижу вот что:

Time    Broadcasted Elements
T1      K1=V1, K2=V2
T2      K1=V1, K2=V2
T3      K1=V4, K2=V2, K3=V3

Что я сделал, чтобы преодолеть эту проблему, так это просто взять окно в потоке данных и использовать агрегатную функцию с аккумулятором вместо редуктора, но я бы предпочел использовать неоконный подход.

Я провел некоторую отладку, и я пришел к выводу, что, хотя на этапе отображения он отображает только доступные элементы за это время, на этапе сокращения он сокращается на основе предыдущего состояния (тем самым я средний результат времени - 1) + все элементы в этой точке. Я нахожу довольно странным иметь невидимое состояние в фазе редукции. С моей точки зрения, это должно основываться только на тех элементах, которые прямо исходят от картографов. Может быть, мое понимание сокращения в Flink неверно, но я бы хотел получить некоторые разъяснения по этому поводу.

1 Ответ

0 голосов
/ 03 апреля 2019

Да, когда к потоку применяется любой из встроенных агрегаторов Flink, например, сумма, максимум, уменьшение и т. Д., Он агрегирует весь поток в пошаговом режиме с сохранением состояния. Точнее, это делается в KeyedStreams, а агрегация выполняется по принципу «ключ за ключом», но постоянно, без ограничений. Например, если вы использовали sum () в потоке целых чисел 1, 2, 3, 4, 5, ..., то sum () создаст поток 1, 3, 6, 10, 15, .... В вашем случае, less () создаст постоянно обновляемый поток, который будет включать в себя все больше пар ключ / значение.

Если бы вам нужно было набирать поток по времени, вы должны получить результаты, которые вы ищете, но состояние ключа все равно будет сохраняться вечно, что, вероятно, будет проблематично. Я предлагаю вам либо использовать оконный API, либо что-то вроде RichFlatMap или ProcessFunction, где вы можете напрямую управлять состоянием.

...