Запись Apache Beam 2.9 в файлы Avro в несколько каталогов в GCS с помощью writeDynamic - PullRequest
0 голосов
/ 01 февраля 2019

Я использую Beam 2.9.0 для записи файлов Avro в несколько каталогов.У меня есть некоторый класс "Event", который имеет поле с именем "Id" a String.Я хочу сгруппировать их по «Id» и записать в их отдельный каталог.

Я не могу понять, как определить «DestinationT» в

Class FileIO.Write<DestinationT,UserT>

Ниже приведено то, что япопытка

FileIO.<String, Five9Event>writeDynamic()
                .by((SerializableFunction<Event, String>) in -> in.getId())
                .via(Contextful.fn(SerializableFunctions.<Event>identity()), //There is no conversion here
                        AvroIO.sink(Event.class))
                .withNumShards(1)
                .withNaming(id -> new CustomeFileNaming(type) //Is this where a directory is specified??
                .withTempDirectory("")//Some Temp Directory
                .withDestinationCoder(AvroCoder.of(Event.class, Event.SCHEMA$))//???

У всех получателей должен быть один и тот же кодер Avro Schema.

Линия

.withDestinationCoder(AvroCoder.of(Event.class, Event.SCHEMA$)

не работает, так как ожидает, что Coder для DestinationT (строка вмой случай) а не Coder для UserT (Event в моем случае).Если DestinationT просто используется для группировки, я не могу понять, зачем нам нужен кодер для него по сравнению с фактической полезной нагрузкой, которую мы записываем в файл.

Подпись в FileIO.write

FileIO.Write<DestinationT,UserT>    withDestinationCoder(Coder<DestinationT> destinationCoder)
Specifies a Coder for the destination type, if it can not be inferred from by(org.apache.beam.sdk.transforms.SerializableFunction<UserT, DestinationT>).

Какова точная семантика DestinationT и если это просто какой-то определенный пользователем тип, зачем для него нужен кодер?

1 Ответ

0 голосов
/ 02 февраля 2019

У меня это работает, но я до сих пор не могу ответить, зачем нам нужен DestinationCoder

FileIO.<String, Event>writeDynamic()
            .by((SerializableFunction<Event, String>) in -> in.getId())
            .via(Contextful.fn(
                        SerializableFunctions.<Event>identity()
                    ),
                    Contextful.fn(
                            (dest) -> AvroIO.sink(Event.class)
                            ))
            .withNumShards(1)
            .withTempDirectory(getTempDirectory())
            .withDestinationCoder(StringUtf8Coder.of())
            .withNaming((dest) -> new CustomeFileNaming(dest, config))

Я полагаю, что AroIO.sink установит код записи для полезной нагрузки. DestinationT - это строка Iтолько что использовал StringUtfCoder.of ()

...