почему getSideOutput ничего не испускает? - PullRequest
0 голосов
/ 04 августа 2020

Я использую getSideOutput для создания побочного потока вывода. Наличие элемента в потоке предварительной обработки перед обработкой с помощью getSideOutput, но при вызове метода getSideOutput ничего не выдается.

код следовать

DataStream<String> asyncTable =
        join3
            .flatMap(new ExtractList())
            .process( // detect code using for test
                new ProcessFunction<String, String>() {
                  @Override
                  public void processElement(String value, Context ctx, Collector<String> out)
                      throws Exception {
                    System.out.println(value); // can detect elements
                  }
                })
            .getSideOutput(new OutputTag<>("asyTab", TypeInformation.of(String.class)));

, но при вызове getSideOutput метода после

DataStream<String> asyncTable =
    join3
        .flatMap(new ExtractList())
        .getSideOutput(new OutputTag<>("asyTab", TypeInformation.of(String.class)))
        .process(
            new ProcessFunction<String, String>() {
              @Override
              public void processElement(String value, Context ctx, Collector<String> out)
                  throws Exception {
                System.out.println(value); // nothing  detect elements
              }
            });

ExtractList следующим образом

import org.apache.flink.util.Collector;


public class ExtractList extends RichFlatMapFunction<NewTableA, String> {

  @Override
  public void flatMap(NewTableA value, Collector<String> out) throws Exception {

    String tableName = "NewTableA";
    String primaryKeyName = "PA1";
    String primaryValue = value.getPA1().toString();

    String result = tableName+":"+primaryKeyName+":"+primaryValue;
    //System.out.println(result);  // right result output
    out.collect(result);
  }
}

почему getSideOutput для создания побочного вывода поток без элементов.

Ответы [ 2 ]

1 голос
/ 06 августа 2020

Следует использовать один и тот же идентификатор выходного тега - в вашем случае это asyncTableValue в ExtractList и asyTab в .getSideOutput(new OutputTag<>("asyTab", TypeInformation.of(String.class)));, которые определенно различаются, и, следовательно, побочный выход asyTab ничего не испускает.

0 голосов
/ 04 августа 2020

извините, это моя ошибка. Я не кодирую ExtractList

public class ExtractList extends ProcessFunction<NewTableA, NewTableA> {
  private OutputTag<String> asyncTableValue =
      new OutputTag<String>("asyncTableValue", TypeInformation.of(String.class));

  @Override
  public void processElement(NewTableA value, Context ctx, Collector<NewTableA> out)
      throws Exception {
    String tableName = "NewTableA";
    String primaryKeyName = "PA1";
    String primaryValue = value.getPA1().toString();

    String result = tableName + ":" + primaryKeyName + ":" + primaryValue;

    ctx.output(asyncTableValue, result);

    out.collect(value);
  }
}
...