Функция сопоставлена ​​с RDD с использованием rdd.map (), вызываемой несколько раз для некоторых строк - PullRequest
0 голосов
/ 02 апреля 2019

У меня есть исходный фрейм данных, в котором есть несколько записей. Я хочу выполнить некоторые операции в каждой строке этого кадра данных. Для этого была использована функция rdd.map. Однако, глядя на журналы, записанные с использованием аккумуляторов, похоже, что отображенная функция была вызвана несколько раз для некоторых строк. Согласно документации, он должен быть вызван ТОЛЬКО один раз.

Я попытался воспроизвести проблему в небольшом скрипте и заметил то же самое поведение. Этот скрипт показан ниже:

import os
import sys
os.environ['SPARK_HOME'] = "/usr/lib/spark/"
sys.path.append("/usr/lib/spark/python/")
from pyspark.sql import *
from pyspark.accumulators import AccumulatorParam


class StringAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue=""):
        return ""

    def addInPlace(self, s1, s2):
        return s1.strip() + " " + s2.strip()

def mapped_func(row, logging_acc):
    logging_acc += "Started map"
    logging_acc += str(row)
    return "test"

if __name__ == "__main__":
    spark_session = SparkSession.builder.enableHiveSupport().appName("rest-api").getOrCreate()
    sc = spark_session.sparkContext
    df = spark_session.sql("select col1, col2, col3, col4, col5, col6 from proj1_db.dw_table where col3='P1'")
    df.show()
    logging_acc = sc.accumulator("", StringAccumulatorParam())
    result_rdd = df.rdd.map(lambda row: Row(row, mapped_func(row, logging_acc)))
    result_rdd.toDF().show()
    print "logs: " + str(logging_acc.value)

Ниже приведена соответствующая часть вывода:

+----+----+----+----+----+----+
|col1|col2|col3|col4|col5|col6|
+----+----+----+----+----+----+
|   1|   1|  P1|   2|  10|  20|
|   3|   1|  P1|   1|  25|  25|
+----+----+----+----+----+----+

+--------------------+----+
|                  _1|  _2|
+--------------------+----+
|[1, 1, P1, 2, 10,...|test|
|[3, 1, P1, 1, 25,...|test|
+--------------------+----+

logs: Started map Row(col1=1, col2=1, col3=u'P1', col4=2, col5=10, col6=20) Started map Row(col1=1, col2=1, col3=u'P1', col4=2, col5=10, col6=20) Started map Row(col1=3, col2=1, col3=u'P1', col4=1, col5=25, col6=25)

Первая таблица - это исходный фрейм данных, а вторая таблица - результирующий фрейм данных, созданный после вызова функции map. Как видно, функция вызывается дважды для первого ряда. Может кто-нибудь, пожалуйста, помогите мне понять, что происходит, и как мы можем убедиться, что сопоставленная функция вызывается только ОДИН РАЗ в строку.

1 Ответ

1 голос
/ 02 апреля 2019

Согласно документации, он должен вызываться ТОЛЬКО один раз.

Это действительно не тот случай. Любое преобразование может быть выполнено произвольное количество раз (обычно в случае сбоев или для поддержки вторичной логики), и документация прямо говорит, что :

Для обновлений аккумулятора, выполняемых только при действиях , Spark гарантирует, что обновление каждого задания к аккумулятору будет применено только один раз

Таким образом, неявно используемые аккумуляторы, используемые внутри преобразований (например, map), могут обновляться несколько раз за задачи.

В вашем случае происходит многократное выполнение, потому что вы не предоставляете схему при преобразовании RDD в DataFrame. В этом случае Spark выполнит еще одно сканирование данных, чтобы вывести схему из данных, т.е.

spark.createDataFrame(result_rdd, schema)

Это, однако, решит только эту конкретную проблему, и общие положения о преобразовании и поведении накопителя остаются в силе.

...