Мне нужно прочитать файл AVRO из облачного хранилища, а затем записать запись в большую таблицу с ключом строки и AVRO в виде байтов в ячейке столбца. Я использую AVROIO.read для чтения данных как GenericRecord.Как применить функцию pardo для преобразования данных в нечто, что может быть записано в bigtable
// Read AVRO from GCS
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
//.apply - pardo transformation
.apply("Write to Bigtable", write);
Любая помощь по второму этапу в конвейере будет очень признательна
* 1006.* Обновление:
Спасибо Антон за быструю помощь, теперь я понимаю, что мне нужно сделать, и придумал следующее для pardo
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
.apply(ParDo.of(new DoFn<GenericRecord, Iterable<Mutation> >() {
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord gen = c.element();
byte[] fieldNameByte = null;
byte[] fieldValueByte = null;
// ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
for (Schema.Field field : fields) {
try {
String fieldName = field.name();
fieldNameByte = fieldName.getBytes("UTF-8");
String value = String.valueOf(gen.get(fieldName));
fieldValueByte = value.getBytes("UTF-8");
} catch (Exception e) {
e.printStackTrace();
}
Iterable<Mutation> mutations =
ImmutableList.of(
Mutation.newBuilder()
.setSetCell(
Mutation.SetCell.newBuilder()
.setValue(
ByteString.copyFrom(fieldValueByte))
.setFamilyName(COLUMN_FAMILY_NAME))
.build());
c.output(,mutations));
}
}
}))
.apply("Write to Bigtable", write);
return pipeline.run();
Это просто псевдокоди я только учусь и пробую .. Мне нужна помощь по добавлению мутаций в ProcessContext и записи ... Пожалуйста, посмотрите и дайте мне знать, если я нахожусь в правильном направлении и как мне добавить мутацию кконтекст