Это связано с тем, что метод via
для mapElements
предполагает одно из следующих действий: InferableFunction
, SimpleFunction
, ProcessFunction
, SerializableFunction
, Contextful
. В вашем примере AddFieldFn
расширяет DoFn
вместо. Кроме того, по сравнению с примером Python кажется, что вы хотите вывести список из двух элементов, а не получить две разные строки.
Три примера, как это сделать:
// via ProcessFunction
PCollection p1 = p.apply(Create.of(LINES))
.apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
.via((String word) -> (Arrays.asList(word, "Its weekend!"))))
.apply(ParDo.of(new PrintResultsFn()));
// via in-line SimpleFunction
PCollection p2 = p.apply(Create.of(LINES))
.apply(MapElements.via(new SimpleFunction<String, List<String>>() {
public List<String> apply(String word) {
return Arrays.asList(word, "Its weekend!");
}}))
.apply(ParDo.of(new PrintResultsFn()));
// via AddFieldFn class
PCollection p3 = p.apply(Create.of(LINES))
.apply(MapElements.via(new AddFieldFn()))
.apply(ParDo.of(new PrintResultsFn()));
, где AddFieldFn
:
// define AddFieldFn extending from SimpleFunction and overriding apply method
static class AddFieldFn extends SimpleFunction<String, List<String>> {
@Override
public List<String> apply(String word) {
return Arrays.asList(word, "Its weekend!");
}
}
и PrintResultsFn
проверяют строки:
// just print the results
static class PrintResultsFn extends DoFn<List<String>, Void> {
@ProcessElement
public void processElement(@Element List<String> words) {
Log.info(Arrays.toString(words.toArray()));
}
}
Который должен печатать желаемый вывод:
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Полный код здесь . Протестировано с DirectRunner и Java SDK 2.13.0