Использование defaultNaming для динамических оконных записей в Apache Beam - PullRequest
0 голосов
/ 08 мая 2018

Я следую вместе с ответом на этот пост и документацию , чтобы выполнить динамическую оконную запись моих данных в конце конвейера. Вот что у меня есть:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(
        FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://some_bucket/events/")
            .withNaming(key -> defaultNaming(key, ".json")));
}

Но NetBeans предупреждает меня о синтаксической ошибке в последней строке:

FileNaming is not public in Write; cannot be accessed outside package

Как мне сделать defaultNaming доступным для моего конвейера, чтобы я мог использовать его для динамической записи. Или, если это невозможно, что я должен делать вместо этого?

1 Ответ

0 голосов
/ 08 мая 2018

Размещение того, что я выяснил, на случай, если кто-то еще столкнется с этим.

Было три проблемы с тем, как я пытался использовать writeDynamic() раньше.

  1. Ранее я использовал Beam версии 2.3.0, которая действительно описывает FileNaming как внутренний класс для FileIO.Write. Луч 2.4.0 определяет FileNaming как public static interface, делающий его доступным извне.
  2. Полностью разрешено / импортировано defaultNaming. Вместо непосредственного вызова defaultNaming - как это вызывается в документации примера - он должен вызываться как FileIO.Write.defaultNaming, поскольку FileIO - это пакет, который я фактически импортировал.
  3. Добавление withDestinationCoder также требовалось для выполнения динамической записи.

Окончательное решение выглядело так:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(FileIO.<String, String>writeDynamic()
                .by(Event::getKey)
                .via(TextIO.sink())
                .to("gs://some_bucket/events/")
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}

Где Event::getKey - статическая функция, определенная в том же пакете с подписью public static String getKey(String event).

Выполняет оконную запись, которая записывает один файл на окно (как определено методом .withNumShards(1)). Это предполагает, что окно было определено на предыдущем шаге. A GroupByKey не требуется перед записью, поскольку это делается в процессе записи всякий раз, когда число шардов определено явно. См. Документацию FileIO для получения дополнительной информации в разделе «Запись файлов -> Сколько сегментов генерируется на панели».

...