Вставка синтаксической ошибки в код Apache Beam - PullRequest
0 голосов
/ 23 мая 2018

Я пишу код луча Apache для тестирования.Пожалуйста, ознакомьтесь с кодом ниже.Я создал образец SimpleFunction и применил код и пытался скомпилировать.

    public static void main(String[] args) throws IOException {
        System.out.println("Test Log");

        PipelineOptions options = PipelineOptionsFactory.create();
        //options.setRunner(SparkRunner.class);
        options.setRunner(SparkRunner.class);

        Pipeline p = Pipeline.create(options);

        p.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
            // withCompression can be omitted - by default compression is detected from the filename.
            .apply(FileIO.readMatches())
            .apply(MapElements
                // uses imports from TypeDescriptors
                .via(
                    new SimpleFunction <ReadableFile, KV<String,String>>() {

                        private static final long serialVersionUID = -7867677L;

                        @SuppressWarnings("unused")
                        public KV<String,String> createKV(ReadableFile f) {
                            String temp = null;
                            try{
                            temp = f.readFullyAsUTF8String();
                            }catch(IOException e){

                            }
                            return KV.of(f.getMetadata().resourceId().toString(), temp);
                        }

                        @Override
                        public String apply(ReadableFile element, KV<String, String> input) {
                            StringBuilder str = new StringBuilder();
                            return str.toString();
                        }
                    }

            ))
            .apply(FileIO.write());
        p.run();
    }

, но компилятор выбрасывает syntax error at public String apply(ReadableFile

Я пытался, но не удалось это исправить, Может ли кто-нибудь помочь мне?

1 Ответ

0 голосов
/ 23 мая 2018

SimpleFunction<InputT, OutputT> принимает значение InputT и возвращает значение OutputT.Подпись apply в этом случае OutputT apply(InputT input);, см. здесь .

В вашем случае для ваших типов SimpleFunction будет выглядеть следующим образом:

new SimpleFunction <ReadableFile, KV<String,String>>() {
   ...
   @Override
   public KV<String,String> apply(ReadableFile input) {
      ...
   }
}

Например, посмотрите, как это используется здесь .

В вашем случае вам нужно больше логики вокруг readMatches(), см. здесь , например, как этоприменяется для разбора Avros, а this является подробностями реализации PTransform из этого кода.

...