Я пытаюсь создать конвейер, который передает данные из Kafka topi c в BigQuery Google. Данные в топи c есть в Авро.
Я вызываю функцию apply 3 раза. Один раз прочитать из Kafka, один раз извлечь запись и один раз написать в BigQuery. Вот основная часть кода:
pipeline
.apply("Read from Kafka",
KafkaIO
.<byte[], GenericRecord>read()
.withBootstrapServers(options.getKafkaBrokers().get())
.withTopics(Utils.getListFromString(options.getKafkaTopics()))
.withKeyDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
options.getSchemaRegistryUrl().get(),
options.getSubject().get())
)
.withValueDeserializer(
ConfluentSchemaRegistryDeserializerProvider.of(
options.getSchemaRegistryUrl().get(),
options.getSubject().get()))
.withoutMetadata()
)
.apply("Extract GenericRecord",
MapElements.into(TypeDescriptor.of(GenericRecord.class)).via(KV::getValue)
)
.apply(
"Write data to BQ",
BigQueryIO
.<GenericRecord>write()
.optimizedWrites()
.useBeamSchema()
.useAvroLogicalTypes()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchemaUpdateOptions(ImmutableSet.of(BigQueryIO.Write.SchemaUpdateOption.ALLOW_FIELD_ADDITION))
//Temporary location to save files in GCS before loading to BQ
.withCustomGcsTempLocation(options.getGcsTempLocation())
.withNumFileShards(options.getNumShards().get())
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withMethod(FILE_LOADS)
.withTriggeringFrequency(Utils.parseDuration(options.getWindowDuration().get()))
.to(new TableReference()
.setProjectId(options.getGcpProjectId().get())
.setDatasetId(options.getGcpDatasetId().get())
.setTableId(options.getGcpTableId().get()))
);
При запуске я получаю следующую ошибку:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Extract GenericRecord/Map/ParMultiDo(Anonymous).output [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for org.apache.avro.generic.GenericRecord.
Building a Coder using a registered CoderProvider failed.
Как настроить кодировщик на правильное чтение Avro?