Можно ли создать изменяемую структуру общих данных без использования аккумуляторов в спарк? - PullRequest
0 голосов
/ 09 января 2019

Я новичок в искре, и есть кое-что, что мне совершенно непонятно. Но базовые знания требуют, чтобы только аккумуляторы являлись изменяемыми переменными, которые могут обновляться через исполнителей, и их значение может быть получено драйвером. Любые другие переменные, инициализированные в коде, которые обновляются через исполнители с обновленными значениями, не передаются обратно драйверу, поскольку они являются отдельными JVM.

Я работаю над частью проекта, которая сохраняет смещения из zookeeper в структуре данных для будущего использования. Поскольку смещения получены для исполнителей, было почти невозможно иметь общую структуру данных, которая будет также обновлять смещения на разделы обратно в драйвер. Пока я не наткнулся на этот код в https://spark.apache.org/docs/2.3.0/streaming-kafka-0-8-integration.html.

AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>();
directKafkaStream.transformToPair(rdd -> { 
    OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    offsetRanges.set(offsets);    return rdd;
}).map(
    ...
    ).foreachRDD(rdd -> {    for (OffsetRange o : offsetRanges.get()) {
        System.out.println(
            o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
        );}    
        ...
    });
System.out.println(Arrays.toString(offsetRanges.get()));

Это противоречит основной теории: когда я получаю доступ к значению AtomicReference<OffsetRange[]> offsetRanges в своем драйвере, я получаю правильное обновленное значение (как обновлено в методе transformToPair в коде исполнителя), даже если оно возвращает мне ноль или пустой ответ. Пожалуйста, может кто-нибудь объяснить мне это поведение?

1 Ответ

0 голосов
/ 09 января 2019

Можно ли создать изменяемую структуру общих данных без использования аккумуляторов в spark?

номер

Это противоречит основной теории, когда я получаю доступ к значению

Это не так, потому что значение не изменяется вне драйвера. Закрытие transformToPair выполняется на драйвере, а не на исполнителе.

Следовательно, offsetRanges.set(offsets) выполняется на той же JVM, где живет исходное значение offsetRanges.

...