Получить все элементы в PCollection независимо от тегов - PullRequest
0 голосов
/ 01 февраля 2020

У меня есть PCollection элементов BigQuery TableRow, которые помечены в зависимости от того, был ли успешно проанализирован один столбец TableRow или нет.

final TupleTag<TableRow> OK = new TupleTag<TableRow>(){};
final TupleTag<TableRow> NOTOK = new TupleTag<TableRow>(){};

Функция My ParDo помечает эти TableRow на основе анализа столбца и возвращает PCollectionTuple с именем myPCollection.

Я хотел бы сделать следующее:

  1. Получить все элементы в PCollection (помеченные как OK и NOTOK) и вывести их в BigQuery.
  2. Получить только элементы, помеченные как NOTOK, и отправить их в Pub / Sub

Я знаю, что могу сделать # 2, позвонив

myPCollection.get(NOTOK)

Я не могу найти способ сделать № 1. Я видел, что есть метод myPCollection.getAll (), но вместо PCollection он возвращает Map, PCollection>

Любые идеи о том, как получить весь набор элементов независимо от того, как они помечены?

1 Ответ

1 голос
/ 02 февраля 2020

Вы можете использовать преобразование Flatten ( Beam guide ), чтобы объединить разные PCollections в один:

PCollection<String> okResults = myPCollection.get(OK);
PCollection<String> notOkResults = myPCollection.get(NOTOK);

PCollectionList<String> pcl = PCollectionList.empty(p);
pcl = pcl.and(okResults).and(notOkResults);
PCollection<String> allResults = pcl.apply(Flatten.pCollections());

В этом случае allResults будет содержать элементы OK и NOTOK. Я сделал пример (полный код здесь ) с двумя входными строками, где они классифицируются на выходы с хорошей или плохой стороны:

Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: bad line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$5 processElement
INFO: All elements: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$3 processElement
INFO: Ok element: good line
Feb 01, 2020 10:42:24 PM org.apache.beam.examples.AllSideOutputs$4 processElement
INFO: Not Ok element: bad line

Протестировано с 2.17.0 SDK и DirectRunner .

...