Я пытаюсь реализовать конвейер данных, который объединяет несколько неограниченных источников из тем Кафки. Я могу подключиться к теме и получить данные как PCollection<String>
, и мне нужно преобразовать их в PCollection<Row>
. Я разделяю строку с запятой на массив и использую схему для преобразования ее в строку. Но как реализовать / построить схему и привязать к ней значения динамически?
Даже если я создаю отдельный класс для построения схемы, есть ли способ привязать массив строк непосредственно к схеме?
Ниже мой текущий рабочий код, который является статическим и должен переписываться каждый раз, когда я строю конвейер, и он также удлиняется в зависимости от количества полей.
final Schema sch1 =
Schema.builder().addStringField("name").addInt32Field("age").build();
PCollection<KafkaRecord<Long, String>> kafkaDataIn1 = pipeline
.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("testin1")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of("group.id", (Object)"test1")));
PCollection<Row> Input1 = kafkaDataIn1.apply(
ParDo.of(new DoFn<KafkaRecord<Long, String>, Row>() {
@ProcessElement
public void processElement(
ProcessContext processContext,
final OutputReceiver<Row> emitter) {
KafkaRecord<Long, String> record = processContext.element();
final String input = record.getKV().getValue();
final String[] parts = input.split(",");
emitter.output(
Row.withSchema(sch1)
.addValues(
parts[0],
Integer.parseInt(parts[1])).build());
}}))
.apply("window",
Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());
Input1.setRowSchema(sch1);
Я ожидаю, что достигну того же самого, что и выше, динамически / многократно используемым кодом.