Как прочитать файл JSON с помощью функции Apache Beam ParDo в Java - PullRequest
0 голосов
/ 24 декабря 2018

Я новичок в Apache Beam.Согласно нашему требованию, мне нужно передать в качестве входных данных файл JSON, содержащий от 5 до 10 записей JSON, построчно прочитать эти данные JSON из файла и сохранить их в BigQuery.Может кто-нибудь, пожалуйста, помогите мне с моим примером кода ниже, который пытается прочитать данные JSON, используя Apache Beam:

PCollection<String> lines = 
    pipeline
      .apply("ReadMyFile", 
             TextIO.read()
                   .from("C:\\Users\\Desktop\\test.json")); 
if(null!=lines) { 
  PCollection<String> words =
     lines.apply(ParDo.of(new DoFn<String, String>() { 
        @ProcessElement
        public void processElement(ProcessContext c) { 
          String line = c.element();
        }
      })); 
  pipeline.run(); 
}

Ответы [ 2 ]

0 голосов
/ 04 января 2019

Предположим, что в файле есть строки json, как показано ниже:

{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}

Чтобы сохранить эти значения из файла в BigQuery через DataFlow / Beam, вам, возможно, придется выполнить следующие шаги:

  • Определение TableReference для ссылки на таблицу BigQuery.

  • Определение TableFieldSchema для каждого столбца, который предполагается сохранить.

  • Чтение файла с использованием TextIO.read ().

  • Создание DoFn для анализа строки Json в формате TableRow.

  • Подтверждениеобъекты TableRow с использованием BigQueryIO.

Вы можете обратиться к приведенному ниже фрагменту кода, касающемуся описанных выше шагов,

  • Для создания TableReference и TableFieldSchema,

    TableReference tableRef = new TableReference();
    tableRef.setProjectId("project-id");
    tableRef.setDatasetId("dataset-name");
    tableRef.setTableId("table-name");
    
    List<TableFieldSchema> fieldDefs = new ArrayList<>();
    fieldDefs.add(new TableFieldSchema().setName("column1").setType("STRING"));
    fieldDefs.add(new TableFieldSchema().setName("column2").setType("FLOAT"));  
    
  • Для шагов конвейера

    Pipeline pipeLine = Pipeline.create(options);
    pipeLine
    .apply("ReadMyFile", 
            TextIO.read().from("path-to-json-file")) 
    
    .apply("MapToTableRow", ParDo.of(new DoFn<String, TableRow>() {
        @ProcessElement
        public void processElement(ProcessContext c) { 
            Gson gson = new GsonBuilder().create();
            HashMap<String, Object> parsedMap = gson.fromJson(c.element().toString(), HashMap.class);
    
            TableRow row = new TableRow();
            row.set("column1", parsedMap.get("col1").toString());
            row.set("column2", Double.parseDouble(parsedMap.get("col2").toString()));
            c.output(row);
        }
    }))
    
    .apply("CommitToBQTable", BigQueryIO.writeTableRows()
            .to(tableRef)
            .withSchema(new TableSchema().setFields(fieldDefs))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND));
    
    pipeLine.run(); 
    

Таблица BigQuery может выглядеть следующим образом:

enter image description here

0 голосов
/ 27 декабря 2018

Ответ зависит от этого.

TextIO читает файлы построчно.Таким образом, в вашей test.json каждая строка должна содержать отдельный объект Json.

Имеющиеся у вас ParDo будут затем получать эти строки одну за другой, то есть каждый вызов @ProcessElement получает одну строку.

Тогда в вашем ParDo вы можете использовать что-то вроде Джексона ObjectMapper для анализа Json с линии (или любого другого парсера Json, с которым вы знакомы, но Джексон широко используется, в том числе и в нескольких местах).Сам луч.

В целом подход к написанию ParDo таков:

  • получить c.element();
  • сделать что-то со значением c.element(), например, парсинг его из json в объект java;
  • отправьте результат того, что вы сделали, на c.element() на c.output();

Я бы порекомендовал начать с просмотраРасширение Jackson для Beam SDK добавляет PTransforms, чтобы сделать именно это, см. это и это .

Пожалуйста, взгляните также на this post, там есть несколько ссылок.

Также есть JsonToRow transform , в котором вы можете искать похожие логиc, разница в том, что он не анализирует Json в определенный пользователем объект Java, а вместо этого в класс Beam Row.

Перед записью в BQ необходимо преобразовать объекты, которые вы проанализировали из Json, в BQстроки, которые будут еще ParDo после вашей логики синтаксического анализа, а затем фактически применят BQIO в качестве еще одного шага.Вы можете увидеть несколько примеров в BQ test .

...