Я использую Pyspark 2.4.4., И мне нужно использовать UDF для создания желаемого результата. Этот UDF использует транслируемый словарь. Во-первых, похоже, мне нужно изменить код, чтобы UDF мог принять словарь. Во-вторых, я не уверен, что то, что я делаю, является наиболее эффективным способом go в Spark 2.4. Мой код выглядит следующим образом:
# This is a sample of the original Spark dataframe, which I will use to create the dictionary
df = spark.createDataFrame([(220, 2, '2012-11-22 22:03:42'), (2382556,3, '2012-11-23 22:03:42'), (7854140,3,'2012-11-28 22:03:42')], ["user", "preacher", "time"])
# I am converting the above dataframe to pandas dataframe in order to create my dictionary
Dict = df.toPandas().groupby('preacher')['user','time'].apply(lambda g: list(map(tuple, g.values.tolist()))).to_dict()
#Broadcast the dictionary
pcDict = sc.broadcast(Dict)
## Function that calls the dictionary
def example(n):
nodes = []
children = [i[0] for i in pcD.value[n]]
for child in children:
nodes.append(child)
return Row('Out1', 'Out2')(nodes, [(n, n+2), (n, n+4)])
## Convert the Python function to UDF
schema = StructType([
StructField("Out1", ArrayType(IntegerType()), False),
StructField("Out2", ArrayType(StructType([StructField("_1", IntegerType(), False), StructField("_2", IntegerType(), False)])))])
example_udf = F.udf(example, schema)
# Create sample dataframe to test the UDF function
testDf = spark.createDataFrame([(3, 4), (220,5)], ["user", "Number"])
### Final output
newDf = testDf.withColumn("Output", F.explode(F.array(example_udf(testDf["user"]))))
newDf = newDf.select("user", "Output.*")
Мой первый вопрос касается словаря. Должен ли я использовать его или есть другой, более эффективный способ? Я думал о collectAsMap (), но, учитывая, что он доступен для rdds, я не уверен, является ли это способом go в Spark 2.4.
Второй вопрос заключается в том, что данный словарь является способом на go, как мне изменить функцию udf?
Заранее спасибо!