UDF со словарями на Spark 2.4 - PullRequest
       79

UDF со словарями на Spark 2.4

1 голос
/ 23 января 2020

Я использую Pyspark 2.4.4., И мне нужно использовать UDF для создания желаемого результата. Этот UDF использует транслируемый словарь. Во-первых, похоже, мне нужно изменить код, чтобы UDF мог принять словарь. Во-вторых, я не уверен, что то, что я делаю, является наиболее эффективным способом go в Spark 2.4. Мой код выглядит следующим образом:

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()

#Broadcast the dictionary
pcDict = sc.broadcast(Dict)

## Function that calls the dictionary
def example(n):
    nodes = []
    children = [i[0] for i in pcD.value[n]]
    for child in children:
                    nodes.append(child)

    return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])

## Convert the Python function to UDF
schema = StructType([
    StructField("Out1", ArrayType(IntegerType()), False),
    StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])

example_udf = F.udf(example, schema)

# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])

### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")

Мой первый вопрос касается словаря. Должен ли я использовать его или есть другой, более эффективный способ? Я думал о collectAsMap (), но, учитывая, что он доступен для rdds, я не уверен, является ли это способом go в Spark 2.4.

Второй вопрос заключается в том, что данный словарь является способом на go, как мне изменить функцию udf?

Заранее спасибо!

1 Ответ

2 голосов
/ 24 января 2020

Что касается первого вопроса, я думаю, что pandas предлагает элегантный способ преобразования ваших данных в словарь. Несмотря на то, что pandas будет выполняться на одном узле, вам может потребоваться использовать мощь кластера и, следовательно, выбрать go для версии Spark. Еще один фактор, это размер самого словаря. Если вы уверены, что словарь может легко поместиться в одном узле, вы можете безопасно сохранить версию pandas, в противном случае попробуйте следующий код Spark:

from pyspark.sql import functions as F

# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])

df = df.rdd.map(lambda r: (r[1], (r[0], r[2]))) \
      .toDF(["preacher", "tuple"]) \
      .groupBy("preacher") \
      .agg(F.collect_list("tuple").alias("tuple"))

dict = {}
for k,v in df.rdd.collectAsMap().items():
  dict[k] = list(map(lambda row: (row[0], row[1]), v))

dict
# {3: [(2382556, '2012-11-23 22:03:42'), (7854140, '2012-11-28 22:03:42')],
#  2: [(220, '2012-11-22 22:03:42')]}

Также стоит упомянуть, что Spark будет упаковывать и отправлять вместе с каждой задачей все локальные переменные, используемые в программе. Поэтому broadcast подходит для больших переменных, которые должны храниться на исполнителях, чтобы быть легко доступными для любой задачи.

...