Размещение того, что я выяснил, на случай, если кто-то еще столкнется с этим.
Было три проблемы с тем, как я пытался использовать writeDynamic()
раньше.
- Ранее я использовал Beam версии 2.3.0, которая действительно описывает
FileNaming
как внутренний класс для FileIO.Write
. Луч 2.4.0 определяет FileNaming
как public static interface
, делающий его доступным извне.
- Полностью разрешено / импортировано
defaultNaming
. Вместо непосредственного вызова defaultNaming
- как это вызывается в документации примера - он должен вызываться как FileIO.Write.defaultNaming
, поскольку FileIO
- это пакет, который я фактически импортировал.
- Добавление
withDestinationCoder
также требовалось для выполнения динамической записи.
Окончательное решение выглядело так:
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}
Где Event::getKey
- статическая функция, определенная в том же пакете с подписью public static String getKey(String event)
.
Выполняет оконную запись, которая записывает один файл на окно (как определено методом .withNumShards(1)
). Это предполагает, что окно было определено на предыдущем шаге. A GroupByKey
не требуется перед записью, поскольку это делается в процессе записи всякий раз, когда число шардов определено явно. См. Документацию FileIO для получения дополнительной информации в разделе «Запись файлов -> Сколько сегментов генерируется на панели».