Как я могу динамически добавить поле в MapElements в apache_beam? - PullRequest
0 голосов
/ 23 июня 2019

Я был бы признателен, если бы кто-нибудь помог мне написать код java для apache_beam (2.13.0) .

В python , вы могли бы динамически добавить поле, используя отображение 1 к 1 из Карта функция.

Код

#!/usr/bin/env

import apache_beam as beam
from apache_beam.io.textio import WriteToText

def addoutput(line):
    return [line, "Its weekend!"]

with beam.Pipeline() as p:
    ( p
      | beam.Create(["blah"])
      | beam.Map(addoutput)
      | WriteToText(file_path_prefix="/tmp/sample")
    )

Результат

['blah', 'Its weekend!']

Однако, когда я пытаюсь сделать то же самое с Java, я получаю ошибку компиляции в maven .

Код

public class SampleTextIO
{
    static class AddFieldFn extends DoFn<String, String> {

        @ProcessElement
        public void processElement(@Element String word, OutputReceiver<String> receiver) {

            receiver.output(word);
            receiver.output("Its weekend!");
        }
    }

    public static void main ( String[] args ) {
        System.out.println( "Main class for DirectRunner" );

        // Pipeline create using default runner (DirectRunnter)
        // Interface: PipelineOptions
        PipelineOptions options = PipelineOptionsFactory.create();

        Pipeline p = Pipeline.create(options);

        // Example pcollection
        final List<String> LINES = Arrays.asList(
            "blah"
        );

        // Read lines from file
        p.apply(Create.of(LINES))
         .apply(MapElements.via(new AddFieldFn()))
         .apply(TextIO.write().to("/tmp/test-out"));

        p.run().waitUntilFinish();
    }
}

Результат

[ERROR] /home/ywatanabe/git/google-data-engineer/Data_Science_on_the_Google_Cloud_Platform/Ch04/java/directrunner/src/main/java/com/example/SampleTextIO.java:[43,28] no suitable method found for via(com.example.SampleTextIO.AddFieldFn)
[ERROR]     method org.apache.beam.sdk.transforms.MapElements.<InputT,OutputT>via(org.apache.beam.sdk.transforms.InferableFunction<InputT,OutputT>) is not applicable
[ERROR]       (cannot infer type-variable(s) InputT,OutputT
[ERROR]         (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.InferableFunction<InputT,OutputT>))
[ERROR]     method org.apache.beam.sdk.transforms.MapElements.<InputT,OutputT>via(org.apache.beam.sdk.transforms.SimpleFunction<InputT,OutputT>) is not applicable
[ERROR]       (cannot infer type-variable(s) InputT,OutputT
[ERROR]         (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.SimpleFunction<InputT,OutputT>))
[ERROR]     method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.ProcessFunction) is not applicable
[ERROR]       (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.ProcessFunction)
[ERROR]     method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.SerializableFunction) is not applicable
[ERROR]       (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.SerializableFunction)
[ERROR]     method org.apache.beam.sdk.transforms.MapElements.via(org.apache.beam.sdk.transforms.Contextful) is not applicable
[ERROR]       (argument mismatch; com.example.SampleTextIO.AddFieldFn cannot be converted to org.apache.beam.sdk.transforms.Contextful)

Чтение javadoc , MapElements поддерживает Processfunction , но не работает в моем случае.

Как я могу динамически добавить такие поля, как python in java ?

1 Ответ

1 голос
/ 23 июня 2019

Это связано с тем, что метод via для mapElements предполагает одно из следующих действий: InferableFunction, SimpleFunction, ProcessFunction, SerializableFunction, Contextful. В вашем примере AddFieldFn расширяет DoFn вместо. Кроме того, по сравнению с примером Python кажется, что вы хотите вывести список из двух элементов, а не получить две разные строки.

Три примера, как это сделать:

// via ProcessFunction
PCollection p1 = p.apply(Create.of(LINES))
  .apply(MapElements.into(TypeDescriptors.lists(TypeDescriptors.strings()))
                    .via((String word) -> (Arrays.asList(word, "Its weekend!"))))
  .apply(ParDo.of(new PrintResultsFn()));

// via in-line SimpleFunction
PCollection p2 = p.apply(Create.of(LINES))
  .apply(MapElements.via(new SimpleFunction<String, List<String>>() {
    public List<String> apply(String word) {
      return Arrays.asList(word, "Its weekend!");
    }}))
  .apply(ParDo.of(new PrintResultsFn()));

// via AddFieldFn class 
PCollection p3 = p.apply(Create.of(LINES))
  .apply(MapElements.via(new AddFieldFn()))
  .apply(ParDo.of(new PrintResultsFn()));

, где AddFieldFn:

// define AddFieldFn extending from SimpleFunction and overriding apply method
static class AddFieldFn extends SimpleFunction<String, List<String>> {
    @Override
    public List<String> apply(String word) {
        return Arrays.asList(word, "Its weekend!");
    }
}

и PrintResultsFn проверяют строки:

// just print the results
static class PrintResultsFn extends DoFn<List<String>, Void> {
    @ProcessElement
    public void processElement(@Element List<String> words) {
        Log.info(Arrays.toString(words.toArray()));
    }
}

Который должен печатать желаемый вывод:

Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]
Jun 23, 2019 8:00:03 PM com.dataflow.samples.SampleTextIO$PrintResultsFn processElement
INFO: [blah, Its weekend!]

Полный код здесь . Протестировано с DirectRunner и Java SDK 2.13.0

...