Доступ к конвейеру в DoFn - PullRequest
0 голосов
/ 10 мая 2019

Я пишу конвейер для репликации данных из одного источника в другой. Информация об источниках данных хранится в дБ (BQ). Как я могу использовать эти данные для динамического построения конечных точек чтения / записи?

Я пытался передать объект Pipeline в свой пользовательский DoFn, но его нельзя сериализовать. Позже я попытался вызвать метод getPipeline () для пропущенного представления, но это не сработало. - что на самом деле ожидается

Я не могу знать все таблицы, которые мне нужно сериализовать заранее, поэтому мне нужно прочитать все данные из БД (или из любого другого источника).

// builds some random view
PCollectionView<IdWrapper> idView = ...;

// reads tables meta and replicates data per each table
pipeline.apply(getTableMetaEndpont().read())
    .apply(ParDo.of(new MyCustomReplicator(idView)).withSideInputs(idView))

private static class MyCustomReplicator extends DoFn<TableMeta, TableMeta> {
    private final PCollectionView<IdWrapper> idView;

    private DataReplicator(PCollectionView<IdWrapper> idView) {
      this.idView = idView;
    }

    // TableMeta {string: sourceTable, string: destTable}
    @ProcessElement
    public void processElement(@Element TableMeta tableMeta, ProcessContext ctx) {
      long id = ctx.sideInput(idView).getValue();

      // builds read endpoint which depends on table meta
      // updates entities
      // stores entities using another endpoint
      idView
          .getPipeline()
          .apply(createReadEndpoint(tableMeta).read())
          .apply(ParDo.of(new SomeFunction(tableMeta, id)))
          .apply(createWriteEndpoint(tableMeta).insert());

      ctx.output(tableMetadata);
    }
}


Я ожидаю, что он будет реплицировать данные, указанные в TableMeta, но я не могу использовать конвейер внутри объекта DoFn, потому что он не может быть сериализован / десериализован.

Есть ли способ реализовать задуманное поведение?

...