Как исправить "несовместимые типы: org.apache.beam.sdk.options.ValueProviderне может быть преобразован в java.lang.String " - PullRequest
0 голосов
/ 04 февраля 2019

Я следовал по этой ссылке , чтобы создать шаблон, который создает лучевой конвейер для чтения из KafkaIO.Но я всегда сталкивался с «несовместимыми типами: org.apache.beam.sdk.options.ValueProvider не может быть преобразован в java.lang.String».Это строка ".withBootstrapServers (options.getKafkaServer ())", которая вызвала ошибку.Версия Beam 2.9.0, и вот часть моего кода.

public interface Options extends PipelineOptions {
    @Description("Kafka server")
    @Required
    ValueProvider<String> getKafkaServer();

    void setKafkaServer(ValueProvider<String> value);

    @Description("Topic to read from")
    @Required
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @Description("Topic to write to")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);

    @Description("File path to write to")
    @Required
    ValueProvider<String> getOutput();

    void setOutput(ValueProvider<String> value);
}

public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);

    PCollection<String> processedData = p.apply(KafkaIO.<Long, String>read()
            .withBootstrapServers(options.getKafkaServer())
            .withTopic(options.getInputTopic())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata() 
    )

И вот как я запускаю код:

mvn compile exec:java \
-Dexec.mainClass=${MyClass} \
-Pdataflow-runner -Dexec.args=" \
--project=${MyClass} \
--stagingLocation=gs://${MyBucket}/staging \
--tempLocation=gs://${MyBucket}/temp \
--templateLocation=gs://${MyBucket}/templates/${MyClass} \
--runner=DataflowRunner"

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

Чтобы получить доступ к значению через ValueProvider, вам нужно использовать метод get, а затем вы получите значение с его конкретным типом.

Например: при наличии опции:

ValueProvider<String> getKafkaServer();

вы можете получить к нему доступ с помощью:

getKafkaServer().get() это вернет вам объект String.

Похоже, что KafkaIo Api требуется для получения строкипараметр, а не ValueProvider, вы должны извлечь значение из оболочки ValueProvider.

0 голосов
/ 05 февраля 2019

Я могу найти проблему, которая заключается в том, что kafkaIO не поддерживается.Ниже приведены шаблон создания Google .

"Некоторые соединители ввода-вывода содержат методы, которые принимают объекты ValueProvider. Чтобы определить поддержку определенного соединителя и метода, см. Справочную документацию API для IСоединитель ввода / вывода. Поддерживаемые методы имеют перегрузку с помощью ValueProvider. Если метод не перегружен, метод не поддерживает параметры времени выполнения. Следующие соединители ввода / вывода имеют как минимум частичную поддержку ValueProvider:

Файлоснованные на IO: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO BigQueryIO * BigtableIO (требуется SDK 2.3.0 или более поздняя версия) PubSubIO SpannerIO "

...