Это моя демоверсия, это приложение может выполнять официальную работу
package org.apache.livy.examples.ReadMongo;
import net.butfly.albacore.utils.logger.Logger;
import org.apache.livy.LivyClient;
import org.apache.livy.LivyClientBuilder;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.ExecutionException;
import org.apache.livy.examples.*;
public class ReadMongoApp {
private static final Logger logger = Logger.getLogger(ReadMongoApp.class);
private static final URI uri;
static {
try {
uri = new URI("http://myIp:8998/");
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) throws IOException {
LivyClient client = new LivyClientBuilder().setConf("kind", "spark").setURI(uri).build();
try {
init(client);
ReadMongoJob job = new ReadMongoJob();
PiJob piJob = new PiJob("alla");
// JobHandle<String> jobHandle = client.submit(job)
// logger.info("------------------" + jobHandle.getState());
String json;
try {
json = client.submit(job).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
logger.info("job result:" + json);
} finally {
client.stop(true);
}
}
private static void init(LivyClient client) {
final String dir = System.getProperty("user.dir");
Path curr = Paths.get(dir);
Path jar = curr.resolve("target" + File.separator + "livy-examples-0.6.0-incubating-SNAPSHOT.jar");
File jarf = jar.toFile();
if (jarf.exists() && jarf.isFile()) client.uploadJar(jarf);
else throw new RuntimeException("File not found: " + jar.toString());
}
}
------------------------------ LivyJob.java -------------- --------
Я думаю, что проблема может быть здесь,
package org.apache.livy.examples.ReadMongo;
import com.mongodb.spark.MongoSpark;
import com.mongodb.spark.config.ReadConfig;
import com.mongodb.spark.config.WriteConfig;
import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
import net.butfly.albacore.utils.logger.Logger;
import org.apache.livy.Job;
import org.apache.livy.JobContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.bson.Document;
import java.util.HashMap;
import java.util.Map;
public class ReadMongoJob implements Job<String> {
private static final long serialVersionUID = -3644479740280555176L;
private static final net.butfly.albacore.utils.logger.Logger logger = Logger.getLogger(ReadMongoJob.class);
@Override
public String call(JobContext jobContext) throws Exception {
JavaSparkContext jsc = jobContext.sc();
"org.apache.spark.serializer.KryoSerializer");
String json = ReadMongo(jsc);
return json;
}
public String ReadMongo(JavaSparkContext jsc) throws InterruptedException {
logger.info("start read mongo");
// Create a JavaSparkContext using the SparkSession's SparkContext object
// Create a custom ReadConfig
Map<String, String> readoption = new HashMap<String, String>();
readoption.put("collection", "sub");
readoption.put("uri", "mongodb://devdb:lalala@lalalala:40012/devdb");
readoption.put("database", "devdb");
readoption.put("partitioner", "MongoSamplePartitioner");
ReadConfig readConfig = ReadConfig.create(readoption);
JavaMongoRDD<Document> customRdd = MongoSpark.load(jsc, readConfig);
Map<String, String> writeOption = new HashMap<String, String>();
writeOption.put("collection", "mainTest");
writeOption.put("uri", "mongodb://devdb:Devdb1234@172.30.10.31:40012/devdb");
writeOption.put("database", "devdb");
writeOption.put("writeConcern.w", "majority");
WriteConfig writeConfig = WriteConfig.create(writeOption);
MongoSpark.save(customRdd, writeConfig);
// Analyze data from MongoDB
System.out.println(customRdd.count());
String data = customRdd.first().toJson();
System.out.println(customRdd.first().toJson());
logger.warn("first data" + data);
return customRdd.first().toJson();
}
}
------ Я использую новейшие свечи и Apache Livy
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!--<parent>-->
<!--<groupId>org.apache.livy</groupId>-->
<!--<artifactId>livy-main</artifactId>-->
<!--<version>0.6.0-incubating-SNAPSHOT</version>-->
<!--<relativePath>../pom.xml</relativePath>-->
<!--</parent>-->
<groupId>org.apache.livy</groupId>
<artifactId>livy-examples</artifactId>
<version>0.6.0-incubating-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<skipTests>true</skipTests>
<skip>true</skip>
<skipDeploy>true</skipDeploy>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-api</artifactId>
<version>0.5.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-scala-api_2.11</artifactId>
<version>0.5.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-client-http</artifactId>
<version>0.5.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<artifactId>kryo-shaded</artifactId>
<groupId>com.esotericsoftware</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!--kryo-->
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.0.0-RC2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-asm6-shaded -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm6-shaded</artifactId>
<version>4.10</version>
</dependency>
<!--mongo-spark-->
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.9.1</version>
</dependency>
<dependency>
<groupId>org.mongodb.spark</groupId>
<artifactId>mongo-spark-connector_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>net.butfly.albacore</groupId>
<artifactId>albacore-logging</artifactId>
<version>3.1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
После Google я попытался использовать новейший kryo, исключая asm pom,
Я попытался вручную настроить Kryo в моем приложении, без эффекта!
Почему? Как это исправить 101