Прочитать файл Avro и записать его в таблицу BigQuery - PullRequest
0 голосов
/ 05 февраля 2019

Моя цель - прочитать данные файла avro из облачного хранилища и записать их в таблицу BigQuery с помощью Java.Было бы хорошо, если бы кто-нибудь предоставил фрагмент кода / идеи для чтения данных в формате avro и записи их в таблицу BigQuery с помощью Cloud Dataflow.

Ответы [ 2 ]

0 голосов
/ 20 марта 2019

Я вижу два возможных подхода:

  1. Использование потока данных:
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline p = Pipeline.create(options);

    // Read an AVRO file.
    // Alternatively, read the schema from a file.
    // https://beam.apache.org/releases/javadoc/2.11.0/index.html?org/apache/beam/sdk/io/AvroIO.html
    Schema avroSchema = new Schema.Parser().parse(
        "{\"type\": \"record\", "
            + "\"name\": \"quote\", "
            + "\"fields\": ["
            + "{\"name\": \"source\", \"type\": \"string\"},"
            + "{\"name\": \"quote\", \"type\": \"string\"}"
            + "]}");
    PCollection<GenericRecord> avroRecords = p.apply(
        AvroIO.readGenericRecords(avroSchema).from("gs://bucket/quotes.avro"));

    // Convert Avro GenericRecords to BigQuery TableRows.
    // It's probably better to use Avro-generated classes instead of manually casting types.
    // https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
    PCollection<TableRow> bigQueryRows = avroRecords.apply(
        MapElements.into(TypeDescriptor.of(TableRow.class))
            .via(
                (GenericRecord elem) ->
                    new TableRow()
                        .set("source", ((Utf8) elem.get("source")).toString())
                        .set("quote", ((Utf8) elem.get("quote")).toString())));

    // https://cloud.google.com/bigquery/docs/schemas
    TableSchema bigQuerySchema =
        new TableSchema()
            .setFields(
                ImmutableList.of(
                    new TableFieldSchema()
                        .setName("source")
                        .setType("STRING"),
                    new TableFieldSchema()
                        .setName("quote")
                        .setType("STRING")));

    bigQueryRows.apply(BigQueryIO.writeTableRows()
        .to(new TableReference()
            .setProjectId("project_id")
            .setDatasetId("dataset_id")
            .setTableId("avro_source"))
        .withSchema(bigQuerySchema)
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));

    p.run().waitUntilFinish();
Импорт данных в BigQuery напрямую без потока данных.Смотрите эту документацию: https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
0 голосов
/ 05 февраля 2019

Для этого вы можете попробовать использовать следующий скрипт Python:

import apache_beam as beam
import sys

PROJECT='YOUR_PROJECT'
BUCKET='YOUR_BUCKET'

def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

   p = beam.Pipeline(argv=argv)

   (p
      | 'ReadAvroFromGCS' >> beam.io.avroio.ReadFromAvro('gs://{0}/file.avro'.format(BUCKET))
      | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('{0}:dataset.avrotable'.format(PROJECT))
   )

   p.run()

if __name__ == '__main__':
   run()

Надеюсь, это поможет.

...