Я пытаюсь использовать локальный Spark Runner для запуска следующего простого конвейера Apache Beam, но я всегда получаю следующее исключение:
Exception in thread "main" java.net.URISyntaxException: Illegal character in path at index 32: /Users/user/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/191.6183.62/IntelliJ IDEA 2019.1 EAP.app/Contents/lib/idea_rt.jar
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
at java.net.URI$Parser.parseHierarchical(URI.java:3105)
at java.net.URI$Parser.parse(URI.java:3063)
at java.net.URI.<init>(URI.java:588)
at org.apache.spark.SparkContext.addJar(SparkContext.scala:1835)
at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:457)
at org.apache.spark.SparkContext$$anonfun$12.apply(SparkContext.scala:457)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:457)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at org.apache.beam.runners.spark.translation.SparkContextFactory.createSparkContext(SparkContextFactory.java:98)
at org.apache.beam.runners.spark.translation.SparkContextFactory.getSparkContext(SparkContextFactory.java:64)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:213)
at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:89)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at Prototype_Spark.runExample(Prototype_Spark.java:32)
at Prototype_Spark.main(Prototype_Spark.java:36)
Следующий Java-код вызывает проблему:
public class Prototype_Spark {
private static List<Integer> prices = List.of(1, 2, 3);
private static void runExample() {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(SparkRunner.class);
// Direct runner runs as expected
// options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read", Create.of(prices))
.apply("Calculate Mean", Mean.globally())
.apply("Map to string", MapElements
.into(TypeDescriptor.of(String.class))
.via(Object::toString))
.apply("Write", TextIO.write().to("/tmp/output"));
pipeline.run().waitUntilFinish();
}
public static void main(String[] args) {
runExample();
}
}
Если я заменил Spark Runner на Direct Runner, конвейер будет работать как положено.Я что-то здесь упустил?
Ниже приведены мои зависимости Gradle:
repositories {
mavenCentral()
}
ext {
beamVersion = '2.10.0'
sparkVersion = '2.3.3'
}
dependencies {
compile "org.apache.beam:beam-sdks-java-core:$beamVersion"
compile "org.apache.beam:beam-sdks-java-extensions-join-library:$beamVersion"
compile "org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:$beamVersion"
compile "org.apache.beam:beam-sdks-java-io-google-cloud-platform:$beamVersion"
compile "org.apache.beam:beam-runners-core-java:$beamVersion"
compile "org.apache.beam:beam-runners-direct-java:$beamVersion"
compile "org.apache.beam:beam-runners-spark:$beamVersion"
compile "org.apache.spark:spark-core_2.11:$sparkVersion"
compile "org.apache.spark:spark-streaming_2.11:$sparkVersion"
testCompile "junit:junit:4.12"
}