public Dataset<Row> myfunc(SparkSession spark, Dataset<Row> dfa, Dataset<Row> dfb){
JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Broadcast<Dataset<Row>> excRateBrdCast = jsc.broadcast(dfa); // very small local test DS . 5 rows 4 cols
log.info(" ##### " + excRateBrdCast.value().count()); //works
spark.udf().register("someudf", new UDF4<Date, String, String, Double, Double>(){
@Override
public Double call(Date cola, String colb, String colc, Double original){
Dataset<Row> excBrdcastRecv = excRateBrdCast.value();
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
if(!colc.equals("SOME")){
Dataset<Row> ds6 = excBrdcastRecv.filter(row -> {
boolean cond1 = row.getAs("cola").toString().equals(cola.toString());
boolean cond2 = row.getAs("colb").toString().equals(colb);
return cond1 && cond2;
});
Double val9 = ds6.first().getAs("colc"); //Spark in local mode freezes here . No error. Just dont proceed
newRate = newRate*val9;
}
return newRate;
}
}, DataTypes.DoubleType);
Dataset<Row> newDs = dfb.withColumn
("addedColumn", callUDF("someudf", col("cola"), col("colb"), col("colc"), col("cold")));
return newDs;
}
Несколько указателей -
- Если я удаляю доступ к excRateBrdCast.value () и отправляю обратно жестко закодированные значения, он отлично работает.
- Использование spark 2.11 с java
- Все наборы данных являются очень маленькими наборами локальных тестовых данных, поэтому размер не является проблемой.
- Не возникает ошибка муравья, просто происходит зависание при попытке получить доступ к широковещательной переменной
Double newRate = original; // If I set a debugger here and do excBrdcastRecv.count() . It freezes no error. But no result
. так же, как когда вызывается действие - журнал застрял на
INFO DAGScheduler - Submitting 1 missing tasks from ResultStage 46 (MapPartitionsRDD[267] at first at PositionsControls.java:178) (first 15 tasks are for partitions Vector(0))
INFO TaskSchedulerImpl - Adding task set 46.0 with 1 tasks
- работает в локальном режиме