Я пишу конвейер для репликации данных из одного источника в другой. Информация об источниках данных хранится в дБ (BQ). Как я могу использовать эти данные для динамического построения конечных точек чтения / записи?
Я пытался передать объект Pipeline в свой пользовательский DoFn, но его нельзя сериализовать. Позже я попытался вызвать метод getPipeline () для пропущенного представления, но это не сработало. - что на самом деле ожидается
Я не могу знать все таблицы, которые мне нужно сериализовать заранее, поэтому мне нужно прочитать все данные из БД (или из любого другого источника).
// builds some random view
PCollectionView<IdWrapper> idView = ...;
// reads tables meta and replicates data per each table
pipeline.apply(getTableMetaEndpont().read())
.apply(ParDo.of(new MyCustomReplicator(idView)).withSideInputs(idView))
private static class MyCustomReplicator extends DoFn<TableMeta, TableMeta> {
private final PCollectionView<IdWrapper> idView;
private DataReplicator(PCollectionView<IdWrapper> idView) {
this.idView = idView;
}
// TableMeta {string: sourceTable, string: destTable}
@ProcessElement
public void processElement(@Element TableMeta tableMeta, ProcessContext ctx) {
long id = ctx.sideInput(idView).getValue();
// builds read endpoint which depends on table meta
// updates entities
// stores entities using another endpoint
idView
.getPipeline()
.apply(createReadEndpoint(tableMeta).read())
.apply(ParDo.of(new SomeFunction(tableMeta, id)))
.apply(createWriteEndpoint(tableMeta).insert());
ctx.output(tableMetadata);
}
}
Я ожидаю, что он будет реплицировать данные, указанные в TableMeta, но я не могу использовать конвейер внутри объекта DoFn, потому что он не может быть сериализован / десериализован.
Есть ли способ реализовать задуманное поведение?