Использование Spark Runner в Apache Beam вызывает исключение - PullRequest
2 голосов
/ 26 марта 2019

Я пытаюсь использовать локальный Spark Runner для запуска следующего простого конвейера Apache Beam, но я всегда получаю следующее исключение:

Exception in thread "main" java.net.URISyntaxException: Illegal character in path at index 32: /Users/user/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/191.6183.62/IntelliJ IDEA 2019.1 EAP.app/Contents/lib/idea_rt.jar
    at java.net.URI$Parser.fail(URI.java:2848)
    at java.net.URI$Parser.checkChars(URI.java:3021)
    at java.net.URI$Parser.parseHierarchical(URI.java:3105)
    at java.net.URI$Parser.parse(URI.java:3063)
    at java.net.URI.<init>(URI.java:588)
    at org.apache.spark.SparkContext.addJar(SparkContext.scala:1835)
    at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:457)
    at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:457)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:457)
    at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
    at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98)
    at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64)
    at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213)
    at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
    at Prototype_Spark.runExample(Prototype_Spark.java:32)
    at Prototype_Spark.main(Prototype_Spark.java:36)

Следующий Java-код вызывает проблему:

public class Prototype_Spark {

    private static List<Integer> prices = List.of(1, 2, 3);

    private static void runExample() {
        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(SparkRunner.class);
        // Direct runner runs as expected
        // options.setRunner(DirectRunner.class);

        Pipeline pipeline = Pipeline.create(options);

        pipeline
                .apply("Read", Create.of(prices))
                .apply("Calculate Mean", Mean.globally())
                .apply("Map to string", MapElements
                        .into(TypeDescriptor.of(String.class))
                        .via(Object::toString))
                .apply("Write", TextIO.write().to("/tmp/output"));

        pipeline.run().waitUntilFinish();
    }

    public static void main(String[] args) {
        runExample();
    }

}

Если я заменил Spark Runner на Direct Runner, конвейер будет работать как положено.Я что-то здесь упустил?

Ниже приведены мои зависимости Gradle:

repositories {
    mavenCentral()
}

ext {
    beamVersion = '2.10.0'
    sparkVersion = '2.3.3'
}

dependencies {
    compile "org.apache.beam:beam-sdks-java-core:$beamVersion"
    compile "org.apache.beam:beam-sdks-java-extensions-join-library:$beamVersion"
    compile "org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:$beamVersion"
    compile "org.apache.beam:beam-sdks-java-io-google-cloud-platform:$beamVersion"
    compile "org.apache.beam:beam-runners-core-java:$beamVersion"
    compile "org.apache.beam:beam-runners-direct-java:$beamVersion"
    compile "org.apache.beam:beam-runners-spark:$beamVersion"

    compile "org.apache.spark:spark-core_2.11:$sparkVersion"
    compile "org.apache.spark:spark-streaming_2.11:$sparkVersion"


    testCompile "junit:junit:4.12"
}
...