Я изо всех сил пытаюсь понять жизненный цикл переменной в искровых замыканиях и как сериализировать искры и распространять код для исполнителя.
Я перечислил 3 различных сценария для предоставления объекта для преобразования карты. Что я понял из Spark Wiki Spark Closure , так это то, что работает только scenerio 3, а остальные выдают исключение, когда я запускаю работу в распределенном режиме (master=yarn)
. Хотя scenerio 1 не использует переменную Broadcast, но все равно дает те же результаты, что и scenerio 2.
Что я хочу понять, так это то, как scenerio 1 отличается от 2?
Класс тестовых бобов
public class TestBean implements Serializable {
String val;
public TestBean(String val) {
this.val = val;
}
public String message(Integer msg){
return String.format("%s==>%d",val,msg);
}
}
Функция сопоставления для добавления сообщения
public class Fn implements Function<Integer,String> {
TestBean obj;
public Fn(TestBean obj ){
this.obj=obj;
}
@Override
public String call(Integer param) throws Exception {
return "Fn:"+obj.message(param);
}
}
Создать СДР с 5 номерами:
TestBean bean = new TestBean("Hello");
JavaRDD<Integer> rdd = context.parallelize(IntStream.range(0, 5).boxed().collect(Collectors.toList()));
Возможные способы передачи ссылки на объект для преобразования карты:
SUpply Ссылка на объект TestBean с использованием функции-
rdd.map(new Fn(bean)).collect();
Поставка testBean с использованием переменной Broadcast:
Broadcast<TestBean> beanBroadCast = context.broadcast(bean);
rdd.map(obj -> beanBroadCast.getValue().message(obj)).collect();
Обратитесь напрямую, используя лямбда-выражение:
rdd.map(obj -> bean.message(obj)).collect();