Перенос данных из хранилища данных в Bigquery с потоком данных - PullRequest
0 голосов
/ 03 июня 2019

Я пытаюсь написать конвейер потока данных для переноса данных из Google Datastore в BigQuery с использованием Python.После некоторого поиска я решил, что мне нужно сделать три шага:

    1. ReadFromDatastore
    2. Convert to Python dicts or Tablerows
    3. WriteToBigQuery

Теперь, первый и последний шаг прост, поскольку они сами являются функциями.Но мне трудно найти хороший способ сделать второй шаг.

Я записал вывод ReadFromDatastore в текстовый файл, и json выглядит следующим образом:

key {
  partition_id {
    project_id: "ProjectID"
  }
  path {
    kind: "KindName"
    id:9999
  }
}
properties {
  key: "property1"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "property2"
  value {
    string_value: ""
  }
}
properties {
  key: "property3"
  value {
    boolean_value: false
  }
}
properties {
  key: "created"
  value {
    timestamp_value {
      seconds: 4444
      nanos: 2222
    }
  }
}
properties {
  key: "created_by"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "date_created"
  value {
    timestamp_value {
      seconds: 4444
    }
  }
}
properties {
  key: "property4"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "property5"
  value {
    array_value {
      values {
        meaning: 00
        string_value: "link"
        exclude_from_indexes: true
      }
    }
  }
}
properties {
  key: "property6"
  value {
    null_value: NULL_VALUE
  }
}
properties {
  key: "property7"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "property8"
  value {
    string_value: ""
  }
}
properties {
  key: "property9"
  value {
    timestamp_value {
      seconds: 3333
      nanos: 3333
    }
  }
}
properties {
  key: "property10"
  value {
    meaning: 00
    string_value: ""
    exclude_from_indexes: true
  }
}
properties {
  key: "property11"
  value {
    boolean_value: false
  }
}
properties {
  key: "property12"
  value {
    array_value {
      values {
        key_value {
          partition_id {
            project_id: "project_id"
          }
          path {
            kind: "Another_kind_name"
            id: 4444
          }
        }
      }
    }
  }
}
properties {
  key: "property13"
  value {
    string_value: "property_value"
  }
}
properties {
  key: "version"
  value {
    integer_value: 4444
  }
}

key {
  partition_id {
    project_id: "ProjectID"
  }
  path {
    kind: "KindName"
    id: 9999
  }
}
.
.
.
.next_entity/row

Должен ли я написать пользовательскую функцию для преобразования json в python, чтобы иметь возможностьнаписать в BigQuery или есть какие-либо функции / библиотеки из хранилища данных Google или Apache, которые я могу использовать?

Я нашел статью , описывающую то, что я пытаюсь сделать, но код, показанный на Java.

1 Ответ

0 голосов
/ 04 июня 2019

Выходные данные преобразования ReadFromDatastore имеют протоколные буферы Entity.

Чтобы преобразовать protobuff в JSON, вы можете проверить этот вопрос: Protobuf в json в python

Вы бы сделали:

p | ReadFromDatastore(...) | beam.Map(my_proto_to_json_fn) | beam.WriteToBigQuery(...)
...