Как Spark отреагирует, если RDD станет больше? - PullRequest
2 голосов
/ 11 марта 2019

У нас есть код, работающий в Apache Spark. После детального изучения кода я определил, что один из наших картографов модифицирует объект, находящийся в RDD, вместо того, чтобы делать копию объекта для вывода. То есть, у нас есть RDD dicts, и функция map добавляет что-то в словарь, а не возвращает новые словари.

СДР должны быть неизменными. Наши мутируют.

У нас также есть ошибки памяти.

Вопрос: Будет ли Spark смущен, если размер СДР внезапно увеличится?

Ответы [ 2 ]

2 голосов
/ 11 марта 2019

Хотя это, вероятно, не дает сбоя, оно может вызвать некоторое неуказанное поведение. Например этот фрагмент

val rdd = sc.parallelize({
    val m = new mutable.HashMap[Int, Int]
    m.put(1, 2)
    m
} :: Nil)
rdd.cache() // comment out to change behaviour!
rdd.map(m => {
    m.put(2, 3)
    m
}).collect().foreach(println) // "Map(2 -> 3, 1 -> 2)"
rdd.collect().foreach(println) // Either "Map(1 -> 2)" or "Map(2 -> 3, 1 -> 2)" depending if caching is used

поведение меняется в зависимости от того, кэшируется ли СДР или нет. В Spark API есть множество функций, которым разрешено изменять данные, что четко указано в документации, см., Например, https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/rdd/PairRDDFunctions.html#aggregateByKey-U-scala.Function2-scala.Function2-scala.reflect.ClassTag-

Подумайте о наличии RDD[(K, V)] записей карты вместо карт, т. Е. СДР [Карта [K, V]]. Это позволит добавить новые записи стандартным способом, используя flatMap или mapPartitions. При необходимости представление карты может быть в конечном итоге сформировано путем группировки и т. Д.

0 голосов
/ 15 марта 2019

Хорошо, я разработал некоторый код для проверки того, что происходит, если объект, указанный в RDD, видоизменяется картографом, и я рад сообщить, что это невозможно, если вы программируете из Python.

Вот моя тестовая программа:

from pyspark.sql import SparkSession

import time

COUNT = 5
def funnydir(i):
    """Return a directory for i"""
    return {"i":i,
            "gen":0 }

def funnymap(d):
    """Take a directory and perform a funnymap"""
    d['gen'] = d.get('gen',0) + 1
    d['id' ] = id(d)
    return d

if __name__=="__main__":
    spark = SparkSession.builder.getOrCreate()
    sc = spark.sparkContext

    dfroot = sc.parallelize(range(COUNT)).map(funnydir)
    dfroot.persist()
    df1 = dfroot.map(funnymap)
    df2 = df1.map(funnymap)
    df3 = df2.map(funnymap)
    df4 = df3.map(funnymap)



    print("===========================================")
    print("*** df1:",df1.collect())
    print("*** df2:",df2.collect())
    print("*** df3:",df3.collect())
    print("*** df4:",df4.collect())
    print("===========================================")

    ef1 = dfroot.map(funnymap)
    ef2 = ef1.map(funnymap)
    ef3 = ef2.map(funnymap)
    ef4 = ef3.map(funnymap)
    print("*** ef1:",ef1.collect())
    print("*** ef2:",ef2.collect())
    print("*** ef3:",ef3.collect())
    print("*** ef4:",ef4.collect())

Если вы запустите это, вы увидите, что идентификатор для словаря d отличается в каждом из фреймов данных. Очевидно, Spark сериализует десериализацию объектов, когда они передаются от маппера к мапперу. Таким образом, каждый получает свою версию.

Если это не так, то первый вызов funnymap для создания df1 также изменит генерацию во фрейме данных dfroot, и в результате ef4 будет иметь разные номера генерации, чем df4.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...