Я провел несколько тестов, в которых симулировал типичный шаблон автоопределения: сначала я пробежался по всем данным, чтобы построить Map
из всех возможных полей и типа (здесь я только что рассмотрел String
или Integer
дляпростота). Я использую конвейер с сохранением состояния , чтобы отслеживать поля, которые уже были видны, и сохранять его как PCollectionView
. Таким образом, я могу использовать .withSchemaFromView()
, поскольку схема неизвестна при строительстве трубопровода. Обратите внимание, что этот подход действителен только для пакетных заданий.
Сначала я создаю несколько фиктивных данных без строгой схемы, где каждая строка может содержать или не содержать любое из полей:
PCollection<KV<Integer, String>> input = p
.apply("Create data", Create.of(
KV.of(1, "{\"user\":\"Alice\",\"age\":\"22\",\"country\":\"Denmark\"}"),
KV.of(1, "{\"income\":\"1500\",\"blood\":\"A+\"}"),
KV.of(1, "{\"food\":\"pineapple pizza\",\"age\":\"44\"}"),
KV.of(1, "{\"user\":\"Bob\",\"movie\":\"Inception\",\"income\":\"1350\"}"))
);
Мы прочитаем входные данные и создадим Map
различных имен полей, которые мы видим в данных, и проведем базовую проверку типов, чтобы определить, содержит ли она INTEGER
или STRING
. Конечно, это может быть продлено при желании. Обратите внимание, что все ранее созданные данные были назначены одному и тому же ключу, так что они сгруппированы вместе, и мы можем создать полный список полей, но это может стать узким местом в производительности. Мы материализуем вывод, чтобы использовать его в качестве побочного ввода:
PCollectionView<Map<String, String>> schemaSideInput = input
.apply("Build schema", ParDo.of(new DoFn<KV<Integer, String>, KV<String, String>>() {
// A map containing field-type pairs
@StateId("schema")
private final StateSpec<ValueState<Map<String, String>>> schemaSpec =
StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
@ProcessElement
public void processElement(ProcessContext c,
@StateId("schema") ValueState<Map<String, String>> schemaSpec) {
JSONObject message = new JSONObject(c.element().getValue());
Map<String, String> current = firstNonNull(schemaSpec.read(), new HashMap<String, String>());
// iterate through fields
message.keySet().forEach(key ->
{
Object value = message.get(key);
if (!current.containsKey(key)) {
String type = "STRING";
try {
Integer.parseInt(value.toString());
type = "INTEGER";
}
catch(Exception e) {}
// uncomment if debugging is needed
// LOG.info("key: "+ key + " value: " + value + " type: " + type);
c.output(KV.of(key, type));
current.put(key, type);
schemaSpec.write(current);
}
});
}
})).apply("Save as Map", View.asMap());
Теперь мы можем использовать предыдущий Map
для построения PCollectionView
, содержащего схему таблицы BigQuery:
PCollectionView<Map<String, String>> schemaView = p
.apply("Start", Create.of("Start"))
.apply("Create Schema", ParDo.of(new DoFn<String, Map<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> schemaFields = c.sideInput(schemaSideInput);
List<TableFieldSchema> fields = new ArrayList<>();
for (Map.Entry<String, String> field : schemaFields.entrySet())
{
fields.add(new TableFieldSchema().setName(field.getKey()).setType(field.getValue()));
// LOG.info("key: "+ field.getKey() + " type: " + field.getValue());
}
TableSchema schema = new TableSchema().setFields(fields);
String jsonSchema;
try {
jsonSchema = Transport.getJsonFactory().toString(schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
c.output(ImmutableMap.of("PROJECT_ID:DATASET_NAME.dynamic_bq_schema", jsonSchema));
}}).withSideInputs(schemaSideInput))
.apply("Save as Singleton", View.asSingleton());
Измените полное имя таблицы PROJECT_ID:DATASET_NAME.dynamic_bq_schema
соответственно.
Наконец, в нашем конвейере мы читаем данные, преобразуем их в TableRow
и записываем их в BigQuery, используя .withSchemaFromView(schemaView)
:
input
.apply("Convert to TableRow", ParDo.of(new DoFn<KV<Integer, String>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
JSONObject message = new JSONObject(c.element().getValue());
TableRow row = new TableRow();
message.keySet().forEach(key ->
{
Object value = message.get(key);
row.set(key, value);
});
c.output(row);
}}))
.apply( BigQueryIO.writeTableRows()
.to("PROJECT_ID:DATASET_NAME.dynamic_bq_schema")
.withSchemaFromView(schemaView)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Полный код здесь .
Схема таблицы BigQuery, созданная конвейером:
и результирующие разреженные данные: