Spark зависает на неопределенный срок при попытке доступа к широковещательной переменной внутри UDF - PullRequest
0 голосов
/ 11 февраля 2020
 public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());

    Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
    log.info(" ##### " + excRateBrdCast.value().count()); //works
    spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
        @Override
        public Double call(Date cola, String colb, String colc, Double original){
            Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
            Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
            if(!colc.equals("SOME")){
                Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
                    boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
                    boolean cond2 = row.getAs("colb").toString().equals(colb);
                    return cond1 && cond2;
                });
                Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
                newRate = newRate*val9;
            }
            return newRate;
        }
    }, DataTypes.DoubleType);

    Dataset<Row> newDs = dfb.withColumn
            ("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));

    return newDs;
}

Несколько указателей -

  1. Если я удаляю доступ к excRateBrdCast.value () и отправляю обратно жестко закодированные значения, он отлично работает.
  2. Использование spark 2.11 с java
  3. Все наборы данных являются очень маленькими наборами локальных тестовых данных, поэтому размер не является проблемой.
  4. Не возникает ошибка муравья, просто происходит зависание при попытке получить доступ к широковещательной переменной Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result. так же, как когда вызывается действие
  5. журнал застрял на INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0)) INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
  6. работает в локальном режиме

1 Ответ

0 голосов
/ 06 марта 2020

Ну, в приведенном выше коде произошла классическая ошибка. Таким образом, переменная excRateBrdCast была передана в эфир. Тогда new UDF4 регистрируется. Spark фактически выполнит этот UDF на нескольких машинах-исполнителях. На этих машинах искра не сможет видеть excRateBrdCast, поэтому она остановится навсегда, пока не появится excRateBrdCast.value(). Так что, как нам нужно передать excRateBrdCast в UDF. Их последовательное появление в кодовом блоке является обманом.

Итак, вам нужно взять UDF в другом классе. И не выполняйте встроенную инициализацию. Определите параметризованный конструктор в UDF, который принимает широковещательную переменную excRateBrdCast и передает ее во время инициализации.

Тогда он сможет видеть широковещательную переменную.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...