Я новичок в искре, и есть кое-что, что мне совершенно непонятно. Но базовые знания требуют, чтобы только аккумуляторы являлись изменяемыми переменными, которые могут обновляться через исполнителей, и их значение может быть получено драйвером. Любые другие переменные, инициализированные в коде, которые обновляются через исполнители с обновленными значениями, не передаются обратно драйверу, поскольку они являются отдельными JVM.
Я работаю над частью проекта, которая сохраняет смещения из zookeeper в структуре данных для будущего использования. Поскольку смещения получены для исполнителей, было почти невозможно иметь общую структуру данных, которая будет также обновлять смещения на разделы обратно в драйвер. Пока я не наткнулся на этот код в https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html.
AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(rdd -> {
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
offsetRanges.set(offsets); return rdd;
}).map(
...
).foreachRDD(rdd -> { for (OffsetRange o : offsetRanges.get()) {
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
);}
...
});
System.out.println(Arrays.toString(offsetRanges.get()));
Это противоречит основной теории: когда я получаю доступ к значению AtomicReference<OffsetRange[]> offsetRanges
в своем драйвере, я получаю правильное обновленное значение (как обновлено в методе transformToPair
в коде исполнителя), даже если оно возвращает мне ноль или пустой ответ. Пожалуйста, может кто-нибудь объяснить мне это поведение?