Запись в базу данных DB2 застревает внутри foreachpartition () из моего кода Java в Spark Cluster с настроенными 20 рабочими узлами - PullRequest
1 голос
/ 01 апреля 2020

Я пытаюсь прочитать данные из файла, загруженного по пути к серверу, выполнить некоторые манипуляции и затем сохранить эти данные в базе данных DB2. У нас есть около 300 тыс. Записей, которые могут увеличиться в будущем, поэтому мы пытаемся выполнить все манипуляции, а также писать в DB2 внутри foreachpartition. Ниже приведены шаги, которые необходимо выполнить для этого.

  1. Создание контекста искры как глобального и статического c.

    static SparkContext sparkContext = new SparkContext(); static JavaSparkContext jc = sparkContext.getJavaSparkContext(); static SparkSession sc = sparkContext.getSparkSession();

  2. Создание набора данных файла, присутствующего на сервере

    Dataset<Row> dataframe = sparkContext.getSparkSession().read().option("delimiter","|").option("header","false").option("inferSchema","false").schema(SchemaClass.generateSchema()).csv(filePath).withColumn("ID",monotonically_increasing_id())).withColumn("Partition_Id",spark_partition_id());

  3. Вызов foreachpartition

    dataframe.foreachpartition(new ForeachPartitionFunction<Row>()){ @Override public void call(Iterator<Row> _row) throws Exception{ List<SparkPositionDto> li = new ArrayList<>(); while(_row.hasNext()){ PositionDto positionDto = AnotherClass.method(row,1); SparkPositionDto spd = copyToSparkDto(positionDto); if(spd != null){ li.add(spd); } } System.out.println("Writing via Spark : List size : "+li.size()); JavaRDD<SparkPositionDto> finalRdd = jc.parallelize(li); Dataset<Row> dfToWrite = sc.createDataFrame(finalRDD, SparkPositionDto.class); System.out.println("Writing Data");<br> if(dfToWrite != null){ dfToWrite.write() .format("jdbc") .option("url","jdbc:msdb2://"+"Database_Name"+";useKerberos=true") .option("driver","DRIVER_NAME") .option("dbtable","TABLE_NAME") .mode(SaveMode.Append) .save(); } } }

Странное наблюдение: когда я запускаю этот код за пределами foreachpartition для небольшого набора данных, он работает нормально, и в моем кластере spark работает только 1 драйвер и 1 приложение, но когда один и тот же код выполняется внутри foreachpartition, Я мог видеть 1 драйвер и 2 приложения, работающие с 1 приложением в рабочем состоянии и другим в ожидании. Если я добавлю numberOfPartitions как 5 в моей схеме, то будет видно, что 5 приложений работают. Он работает непрерывно, ничего в логах, кажется, он где-то застрял.

Может кто-нибудь, пожалуйста, помогите! !

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...