Как читать bigQuery из PCollection в потоке данных - PullRequest
0 голосов
/ 08 ноября 2018

У меня есть коллекция объектов, которую я получаю от pubsub, скажем:

 PCollection<Student> pStudent ;

и в атрибутах студента есть атрибут let say studentID; и я хочу прочитать атрибуты (class_code) из BigQuery с этим идентификатором студента и установить код_класса, который я получаю из BQ для объекта Student в PCollcetion

Кто-нибудь знает, как это реализовать? Я знаю, что в луче есть BigQueryIO, но как я могу это сделать, если критерии строки запроса, которые я хочу выполнить в BQ, взяты из объекта Student (studentID) в PCollection и Как я могу установить значение для PCollection из результата BigQuery?

1 Ответ

0 голосов
/ 08 ноября 2018

Я рассмотрел два варианта сделать это. Можно использовать BigQueryIO, чтобы прочитать всю таблицу и материализовать ее как боковой ввод, или использовать CoGroupByKey, чтобы объединить все данные. Еще одна возможность, которую я реализовал в данном документе, заключается в прямом использовании клиентской библиотеки Java.

Я создал несколько фиктивных данных:

$ bq mk test.students name:STRING,grade:STRING
$ bq query --use_legacy_sql=false 'insert into test.students (name, grade) values ("Yoda", "A+"), ("Leia", "B+"), ("Luke", "C-"), ("Chewbacca", "F")'

, который выглядит так:

enter image description here

и затем в конвейере я генерирую некоторые входные фиктивные данные:

Create.of("Luke", "Leia", "Yoda", "Chewbacca")

Для каждого из этих «учеников» я выбираю соответствующую оценку в таблице BigQuery, следуя подходу из этого примера . Примите во внимание, в зависимости от вашего объема данных, скорости (квоты) и соображений стоимости, как в предыдущем комментарии. Полный пример:

public class DynamicQueries {

    private static final Logger LOG = LoggerFactory.getLogger(DynamicQueries.class);

    @SuppressWarnings("serial")
    public static void main(String[] args) {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
        Pipeline p = Pipeline.create(options);

        // create input dummy data     
        PCollection<String> students = p.apply("Read students data", Create.of("Luke", "Leia", "Yoda", "Chewbacca").withCoder(StringUtf8Coder.of()));

        // ParDo to map each student with the grade in BigQuery
        PCollection<KV<String, String>> marks = students.apply("Read marks from BigQuery", ParDo.of(new DoFn<String, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

                QueryJobConfiguration queryConfig =
                    QueryJobConfiguration.newBuilder(
                      "SELECT name, grade "
                          + "FROM `PROJECT_ID.test.students` "
                          + "WHERE name = "
                          + "\"" + c.element() + "\" "  // fetch the appropriate student
                          + "LIMIT 1")
                        .setUseLegacySql(false) // Use standard SQL syntax for queries.
                        .build();

                // Create a job ID so that we can safely retry.
                JobId jobId = JobId.of(UUID.randomUUID().toString());
                Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

                // Wait for the query to complete.
                queryJob = queryJob.waitFor();

                // Check for errors
                if (queryJob == null) {
                  throw new RuntimeException("Job no longer exists");
                } else if (queryJob.getStatus().getError() != null) {
                  throw new RuntimeException(queryJob.getStatus().getError().toString());
                }

                // Get the results.
                QueryResponse response = bigquery.getQueryResults(jobId)
                TableResult result = queryJob.getQueryResults();

                String mark = new String();

                for (FieldValueList row : result.iterateAll()) {
                    mark = row.get("grade").getStringValue();
                }

                c.output(KV.of(c.element(), mark));
            }
        }));

        // log to check everything is right
        marks.apply("Log results", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                LOG.info("Element: " + c.element().getKey() + " " + c.element().getValue());
                c.output(c.element());
            }
        }));

        p.run();
    }
}

И вывод:

Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Yoda A+
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Luke C-
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Chewbacca F
Nov 08, 2018 2:17:16 PM com.dataflow.samples.DynamicQueries$2 processElement
INFO: Element: Leia B+

(протестировано с BigQuery 1.22.0 и 2.5.0 Java SDK для потока данных)

...