Задание потока данных для записи в BigQuery с автоопределением схемы - PullRequest
0 голосов
/ 11 ноября 2019

В настоящее время мы ищем лучший способ преобразования необработанных данных в общую структуру для дальнейшего анализа. Наши данные - это файлы JSON, некоторые файлы имеют больше полей, некоторые меньше, некоторые могут иметь массивы, но в целом это довольно похожая структура.

Я пытаюсь построить конвейер Apache Beam на Java для этой цели. Все мои конвейеры основаны на этом шаблоне: TextIOToBigQuery.java

Первый подход состоит в том, чтобы загрузить весь JSON как строку в один столбец, а затем использовать Функции JSON в стандартном SQL превратить в общую структуру. Это хорошо описано здесь: Как управлять / обрабатывать изменения схемы при загрузке файла JSON в таблицу BigQuery

Второй подход заключается в загрузке данных в соответствующие столбцы. Так что теперь данные могут быть запрошены через стандартный SQL. Также необходимо знать схему. Его можно обнаружить через консоль, пользовательский интерфейс и другие: С помощью автоматического определения схемы , однако я не нашел ничего о том, как этого можно достичь с помощью конвейера Java и Apache Beam.

Я проанализировал BigQueryIO и похоже, что он не может работать без схемы (за одним исключением, если таблица уже создана)

Как я упоминал ранее, новые файлы могут приносить новые поля, поэтому схема должна быть обновлена ​​соответственно.

Допустим, у меня есть три файла JSON:

1. { "field1": "value1" }
2. { "field2": "value2" }
3. { "field1": "value3", "field10": "value10" }

Сначала создается новая таблица с одним полем "field1" типа string. Поэтому моя таблица должна выглядеть следующим образом:

|field1  |
----------
|"value1"|

Секунда делает то же самое, но добавляет новое поле "field2". И теперь моя таблица должна выглядеть так:

|field1  |field2  |
-------------------
|"value1"|null    |
-------------------
|null    |"value2"|

Третий JSON должен добавить еще одно поле "field10" в схему и так далее. Реальный файл JSON может иметь 200 или более полей. Насколько сложно будет справиться с таким сценарием?

Каким способом лучше выполнить это преобразование?

1 Ответ

0 голосов
/ 12 ноября 2019

Я провел несколько тестов, в которых симулировал типичный шаблон автоопределения: сначала я пробежался по всем данным, чтобы построить Map из всех возможных полей и типа (здесь я только что рассмотрел String или Integer дляпростота). Я использую конвейер с сохранением состояния , чтобы отслеживать поля, которые уже были видны, и сохранять его как PCollectionView. Таким образом, я могу использовать .withSchemaFromView(), поскольку схема неизвестна при строительстве трубопровода. Обратите внимание, что этот подход действителен только для пакетных заданий.

Сначала я создаю несколько фиктивных данных без строгой схемы, где каждая строка может содержать или не содержать любое из полей:

PCollection<KV<Integer, String>> input = p
  .apply("Create data", Create.of(
        KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
        KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
        KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
        KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
  );

Мы прочитаем входные данные и создадим Map различных имен полей, которые мы видим в данных, и проведем базовую проверку типов, чтобы определить, содержит ли она INTEGER или STRING. Конечно, это может быть продлено при желании. Обратите внимание, что все ранее созданные данные были назначены одному и тому же ключу, так что они сгруппированы вместе, и мы можем создать полный список полей, но это может стать узким местом в производительности. Мы материализуем вывод, чтобы использовать его в качестве побочного ввода:

PCollectionView<Map<String, String>> schemaSideInput = input  
  .apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {

    // A map containing field-type pairs
    @StateId("schema")
    private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
        StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

    @ProcessElement
    public void processElement(ProcessContext c,
                               @StateId("schema") ValueState<Map<String, String>> schemaSpec) {
      JSONObject message = new JSONObject(c.element().getValue());
      Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());

      // iterate through fields
      message.keySet().forEach(key ->
      {
          Object value = message.get(key);

          if (!current.containsKey(key)) {
              String type = "STRING";

              try {
                  Integer.parseInt(value.toString());
                  type = "INTEGER";
              }
              catch(Exception e) {}

              // uncomment if debugging is needed
              // LOG.info("key: "+ key + " value: " + value + " type: " + type);

              c.output(KV.of(key, type));
              current.put(key, type); 
              schemaSpec.write(current);
          }
      });
    }
  })).apply("Save as Map", View.asMap());

Теперь мы можем использовать предыдущий Map для построения PCollectionView, содержащего схему таблицы BigQuery:

PCollectionView<Map<String, String>> schemaView = p
  .apply("Start", Create.of("Start"))
  .apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Map<String, String> schemaFields = c.sideInput(schemaSideInput);  
        List<TableFieldSchema> fields = new ArrayList<>();  

        for (Map.Entry<String, String> field : schemaFields.entrySet()) 
        { 
            fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
            // LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
        }

        TableSchema schema = new TableSchema().setFields(fields);

        String jsonSchema;
        try {
            jsonSchema = Transport.getJsonFactory().toString(schema);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));

      }}).withSideInputs(schemaSideInput))
  .apply("Save as Singleton", View.asSingleton());

Измените полное имя таблицы PROJECT_ID:DATASET_NAME.dynamic_bq_schema соответственно.

Наконец, в нашем конвейере мы читаем данные, преобразуем их в TableRow и записываем их в BigQuery, используя .withSchemaFromView(schemaView):

input
  .apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
          JSONObject message = new JSONObject(c.element().getValue());
          TableRow row = new TableRow();

          message.keySet().forEach(key ->
          {
              Object value = message.get(key);
              row.set(key, value);
          });

        c.output(row);
      }}))
  .apply( BigQueryIO.writeTableRows()
      .to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
      .withSchemaFromView(schemaView)
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Полный код здесь .

Схема таблицы BigQuery, созданная конвейером:

enter image description here

и результирующие разреженные данные:

enter image description here

...