Как создать PCollection <Row>из PCollection <String>для выполнения балок SQL Trasforms - PullRequest
1 голос
/ 05 июля 2019

Я пытаюсь реализовать конвейер данных, который объединяет несколько неограниченных источников из тем Кафки. Я могу подключиться к теме и получить данные как PCollection<String>, и мне нужно преобразовать их в PCollection<Row>. Я разделяю строку с запятой на массив и использую схему для преобразования ее в строку. Но как реализовать / построить схему и привязать к ней значения динамически?

Даже если я создаю отдельный класс для построения схемы, есть ли способ привязать массив строк непосредственно к схеме?

Ниже мой текущий рабочий код, который является статическим и должен переписываться каждый раз, когда я строю конвейер, и он также удлиняется в зависимости от количества полей.

final Schema sch1 =
                Schema.builder().addStringField("name").addInt32Field("age").build();

PCollection<KafkaRecord<Long, String>> kafkaDataIn1 = pipeline
  .apply(
    KafkaIO.<Long, String>read()
      .withBootstrapServers("localhost:9092")
      .withTopic("testin1")
      .withKeyDeserializer(LongDeserializer.class)
      .withValueDeserializer(StringDeserializer.class)
      .updateConsumerProperties(
         ImmutableMap.of("group.id", (Object)"test1")));

PCollection<Row> Input1 = kafkaDataIn1.apply(
  ParDo.of(new DoFn<KafkaRecord<Long, String>, Row>() {
    @ProcessElement
    public void processElement(
        ProcessContext processContext,
        final OutputReceiver<Row> emitter) {

          KafkaRecord<Long, String> record = processContext.element();
          final String input = record.getKV().getValue();

          final String[] parts = input.split(",");

          emitter.output(
            Row.withSchema(sch1)
               .addValues(
                   parts[0],
                   Integer.parseInt(parts[1])).build());
        }}))
  .apply("window",
     Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
       .triggering(AfterWatermark.pastEndOfWindow())
       .withAllowedLateness(Duration.ZERO)
       .accumulatingFiredPanes());

Input1.setRowSchema(sch1);

Я ожидаю, что достигну того же самого, что и выше, динамически / многократно используемым кодом.

1 Ответ

1 голос
/ 07 июля 2019

Схема настроена на pcollection, поэтому она не является динамической. Если вы хотите создать ее лениво, вам нужно использовать формат / кодировщик, поддерживающий ее.В качестве примеров можно привести сериализацию Java или json.

При использовании функции sql вы также можете использовать статическую схему с запросом полей и других полей. Таким образом, статическая часть позволяет создавать sql и не терять дополнительные данные..

Romain

...