Получить отдельное поле из PCollection <TableRow> - PullRequest
3 голосов
/ 14 июля 2020

Я читаю поток данных с помощью Apache Beam и записываю в BigQuery в tableA. Мои строки хранятся в наборе данных типа <TableRow>.

В настоящее время я читаю строки как есть и записываю в таблицу. Однако я хотел бы отфильтровать строки на основе столбца timestamp и разделить данные в результате столбца Name в FirstName и LastName, прежде чем записывать данные с новой схемой в новую таблицу с именем tableB

Я не знаю, как получить отдельные поля из набора данных PCollection, и мне нужна помощь с правильным синтаксисом для получения поля строки Name из моего PCollection<TableRow>

Вот мой код:

PCollection<TableRow> rows =
                        transformedRows.apply("Get rows", BeamIO.getRows());
    
       /*Split a row here and name it rowsAfterColumnSplit
         --
         --
       */
        
        //Write the original set of rows
        WriteResult writeResult1 =
                        rows.apply("write rows",
                                BeamIO.getBigQueryIOWriter(schema, "tableA"));
    
        //Write rowsAfterColumnSplit with new schema
        WriteResult writeResult2 =
                        rowsAfterColumnSplit.apply("write rows after column split",
                                BeamIO.getBigQueryIOWriter(newSchema, "tableB"));

Вот пример данных:

| timestamp                    | Name           | City    |

|  2020-07-14 20:12:01.342 UTC | Karl Streisand | Berlin  | 
|  2020-07-14 22:10:10.234 UTC | Anna Karlstad  | Munich  | 

Я хочу разделить имя «Анна Карлстад», отфильтровав строки с отметкой времени после 22.00.00

1 Ответ

0 голосов
/ 14 июля 2020

Вот как это можно сделать:

PCollection<TableRow> transformedRows = rows.apply(ParDo.of(Transform.splitColumn()));

А вот класс Transform:

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.transforms.DoFn;

public class Transform extends DoFn<TableRow, TableRow> {

    private Transform() {
    }

    public static Transform splitColumn() {
        return new Transform();
    }

    @ProcessElement
    public void processElement(@Element TableRow input, OutputReceiver<TableRow> output) {


        if(input.get("Name").toString()!=null){
            input.set("FirstName", input.get("Name").toString().split(" ")[0]);
            input.set("LastName", input.get("Name").toString().split(" ")[1]);
        }
        output.output(input);

    }
}

Затем вы создаете новую схему с дополнительными полями, т.е. FirstName и LastName и напишите в BigQuery с помощью WriteResult

...