Я пытаюсь прочитать данные из файла, загруженного по пути к серверу, выполнить некоторые манипуляции и затем сохранить эти данные в базе данных DB2. У нас есть около 300 тыс. Записей, которые могут увеличиться в будущем, поэтому мы пытаемся выполнить все манипуляции, а также писать в DB2 внутри foreachpartition. Ниже приведены шаги, которые необходимо выполнить для этого.
Создание контекста искры как глобального и статического c.
static SparkContext sparkContext = new SparkContext();
static JavaSparkContext jc = sparkContext.getJavaSparkContext();
static SparkSession sc = sparkContext.getSparkSession();
Создание набора данных файла, присутствующего на сервере
Dataset<Row> dataframe = sparkContext.getSparkSession().read().option("delimiter","|").option("header","false").option("inferSchema","false").schema(SchemaClass.generateSchema()).csv(filePath).withColumn("ID",monotonically_increasing_id())).withColumn("Partition_Id",spark_partition_id());
Вызов foreachpartition
dataframe.foreachpartition(new ForeachPartitionFunction<Row>()){
@Override
public void call(Iterator<Row> _row) throws Exception{
List<SparkPositionDto> li = new ArrayList<>();
while(_row.hasNext()){
PositionDto positionDto = AnotherClass.method(row,1);
SparkPositionDto spd = copyToSparkDto(positionDto);
if(spd != null){
li.add(spd);
}
}
System.out.println("Writing via Spark : List size : "+li.size());
JavaRDD<SparkPositionDto> finalRdd = jc.parallelize(li);
Dataset<Row> dfToWrite = sc.createDataFrame(finalRDD, SparkPositionDto.class);
System.out.println("Writing Data");<br>
if(dfToWrite != null){
dfToWrite.write()
.format("jdbc")
.option("url","jdbc:msdb2://"+"Database_Name"+";useKerberos=true")
.option("driver","DRIVER_NAME")
.option("dbtable","TABLE_NAME")
.mode(SaveMode.Append)
.save();
}
}
}
Странное наблюдение: когда я запускаю этот код за пределами foreachpartition для небольшого набора данных, он работает нормально, и в моем кластере spark работает только 1 драйвер и 1 приложение, но когда один и тот же код выполняется внутри foreachpartition, Я мог видеть 1 драйвер и 2 приложения, работающие с 1 приложением в рабочем состоянии и другим в ожидании. Если я добавлю numberOfPartitions как 5 в моей схеме, то будет видно, что 5 приложений работают. Он работает непрерывно, ничего в логах, кажется, он где-то застрял.
Может кто-нибудь, пожалуйста, помогите! !