Я использую Beam 2.9.0 для записи файлов Avro в несколько каталогов.У меня есть некоторый класс "Event", который имеет поле с именем "Id" a String.Я хочу сгруппировать их по «Id» и записать в их отдельный каталог.
Я не могу понять, как определить «DestinationT» в
Class FileIO.Write<DestinationT,UserT>
Ниже приведено то, что япопытка
FileIO.<String, Five9Event>writeDynamic()
.by((SerializableFunction<Event, String>) in -> in.getId())
.via(Contextful.fn(SerializableFunctions.<Event>identity()), //There is no conversion here
AvroIO.sink(Event.class))
.withNumShards(1)
.withNaming(id -> new CustomeFileNaming(type) //Is this where a directory is specified??
.withTempDirectory("")//Some Temp Directory
.withDestinationCoder(AvroCoder.of(Event.class, Event.SCHEMA$))//???
У всех получателей должен быть один и тот же кодер Avro Schema.
Линия
.withDestinationCoder(AvroCoder.of(Event.class, Event.SCHEMA$)
не работает, так как ожидает, что Coder для DestinationT (строка вмой случай) а не Coder для UserT (Event в моем случае).Если DestinationT просто используется для группировки, я не могу понять, зачем нам нужен кодер для него по сравнению с фактической полезной нагрузкой, которую мы записываем в файл.
Подпись в FileIO.write
FileIO.Write<DestinationT,UserT> withDestinationCoder(Coder<DestinationT> destinationCoder)
Specifies a Coder for the destination type, if it can not be inferred from by(org.apache.beam.sdk.transforms.SerializableFunction<UserT, DestinationT>).
Какова точная семантика DestinationT и если это просто какой-то определенный пользователем тип, зачем для него нужен кодер?