Есть ли способ создать пустой файл, созданный, наконец, после того, как выходные файлы были успешно созданы для каждого окна в Apache Beam? - PullRequest
0 голосов
/ 27 апреля 2020

У меня есть потоковый конвейер, который потребляет события, помеченные метками времени. Все, что я хочу сделать, это поместить их в Fixed Windows по 5 минут каждый, а затем записать все события в окне в один / несколько файлов на основе шардов, с пустым файлом в конце (этот файл должен быть создан только после все события в этом окне успешно передаются в файлы).

В принципе, я ожидал бы вывод для такого типа,

| --- window_1_output_file

| --- window_1_empty_file (этот файл должен быть создан только после window_1_output_file был создан).

Стратегия окна с использованием триггера выглядит следующим образом:

timestampedLines = timestampedLines.apply("FixedWindows", Window.<String>into(
            FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration())))
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(options.getWindowElementCount())))
            .withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness()))
            .accumulatingFiredPanes()

Есть ли способ создать этот пустой файл, созданный, наконец, после успешного вывода выходных файлов для каждого окна в Apache Beam? И где применить этот лог c для создания этого дополнительного пустого файла?

Заранее спасибо.

1 Ответ

0 голосов
/ 07 мая 2020

Не могли бы вы сделать что-то вроде этого:

// Generate one element per window
PCollection<String> empties = p.apply(GenerateSequence.from(0)
         .withRate(1, Utilities.resolveDuration(options.getWindowDuration())))
 .apply(MapElements.into(TypeDescriptors.strings()).via(elm -> "");

// Your actual PCollection data
PCollection<String> myActualData = p.apply...........

PCollection<String> myActualDataWithDataOnEveryWindow = 
              PCollectionTuple.of(empties).and(myActualData).apply(Flatten.create());

Как только у вас есть элемент в каждом окне, вы можете делать то, что вы делали:

myActualDataWithDataOnEveryWindow.apply("FixedWindows", Window.<String>into(
            FixedWindows.of(Utilities.resolveDuration(options.getWindowDuration())))
            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(options.getWindowElementCount())))
            .withAllowedLateness(Utilities.resolveDuration(options.getWindowLateness()))
            .accumulatingFiredPanes())
          .apply(FileIO........);

Указывает ли это куда-то на вас полезно? Или я слишком потерян? :)

...