У меня есть локальный интеграционный тест, который запускает Spark (2.11_2.4.0
, Java API).Он устанавливает SparkSession с:
private static SparkSession SPARK_SESSION;
@BeforeClass
public static void beforeClass() {
SparkConf sparkConf = new SparkConf().registerKryoClasses(new Class<?>[]{LongWritable.class, Text.class});
SPARK_SESSION = SparkSession.builder().master("local[1]").config(sparkConf).getOrCreate();
}
и затем использует SPARK_SESSION
для обработки данных:
JavaRDD<Tuple2<LongWritable, Text>> rdd = SPARK_SESSION.sparkContext()
.newAPIHadoopFile(inputDir, CustomInputFormat.class, LongWritable.class,Text.class, conf)
.toJavaRDD();
Локально (в Windows) это успешно выполняется каждый раз.Однако при работе на Jenkins он работает только около 25% времени, в 75% случаев происходит сбой:
2019-05-09 10:07:49 WARN o.a.s.s.SparkSession$Builder [Test worker]: Using an existing SparkSession; some configuration may not take effect.
2019-05-09 09:45:36 ERROR o.a.s.e.Executor [Executor task launch worker for task 5]: Exception in task 1.0 in stage 4.0 (TID 5)
java.io.NotSerializableException: org.apache.hadoop.io.LongWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.io.LongWritable, value: 74888)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
Очевидно, что другая работа в Jenkins уже создала SparkSession
, и поэтому мой тест использует это,Однако к существующему SparkSession
не подключена конфигурация Kryo.
Я пробовал getOrCreate().newSession()
, но он все еще использует существующий SparkContext
.Я попытался добавить конфигурацию Kryo в существующий SparkContext
, но это также ничего не меняет.
Как я могу убедиться, что мой тест получает свой собственный SparkSession
или, по крайней мере, может добавлять свойствак конфигурации?