Создание пользовательской оконной функции в Apache Beam - PullRequest
0 голосов
/ 12 сентября 2018

У меня есть конвейер Beam, который начинается с чтения нескольких текстовых файлов, где каждая строка в файле представляет строку, которая вставляется в Bigtable позже в конвейере. Сценарий требует подтверждения того, что количество строк, извлеченных из каждого файла, и количество строк, позже вставленных в соответствие Bigtable. Для этого я планирую разработать собственную стратегию управления окнами, чтобы строки из одного файла были назначены одному окну на основе имени файла в качестве ключа, который будет передан в функцию управления окнами.

Есть ли пример кода для создания пользовательских оконных функций?

1 Ответ

0 голосов
/ 16 ноября 2018

Хотя я изменил свою стратегию для подтверждения введенного количества строк, для всех, кто интересуется элементами управления окнами, считываемыми из пакетного источника, например FileIO в пакетном задании, вот код для создания пользовательской стратегии управления окнами:

public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{

private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);

@Override
public IntervalWindow assignWindow(Instant timestamp) {
    Instant end = new Instant(timestamp.getMillis() + 1);
    IntervalWindow interval = new IntervalWindow(timestamp, end);
    LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
    return interval;
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return this.equals(other);
}

@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
    if (!this.isCompatible(other)) {
        throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
    }
  }

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}   

}

и затем его можно использовать в конвейере, как показано ниже:

p
 .apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
 .apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

Пожалуйста, имейте в виду, что вам нужно будет написать AssignTimestampFn() такчто каждое сообщение содержит метку времени.

...