Вот подход, который, я не уверен, хорошо подходит для Spark.
Отсутствует идентификатор / ключ группировки для данных.
Не уверен, как Catalyst сможет оптимизировать это - рассмотрим позже.Ошибки памяти, если слишком велики?
Сделали данные более сложными, и это работает.Здесь идет:
# No grouping key evident, more a linked list with asc current_ids.
# Added more complexity to the example.
# Questions open on performance at scale. Interested to see how well Catalyst handles this.
# Need really some grouping id/key in the data.
from pyspark.sql import functions as f
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
# Started from dataframe.
# Some more realistic data? At least more complex.
columns = ['current_id', 'previous_id', 'start_date']
vals = [
(100, None, '2001/01/01'),
(200, 100, '2002/02/02'),
(300, 200, '2003/03/03'),
(400, None, '2005/01/01'),
(500, 400, '2006/02/02'),
(600, 300, '2007/02/02'),
(700, 600, '2008/02/02'),
(800, None, '2009/02/02'),
(900, 800, '2010/02/02')
]
df = spark.createDataFrame(vals, columns)
df.createOrReplaceTempView("trans")
# Starting data. The null / None entries.
df2 = spark.sql("""
select *
from trans
where previous_id is null
""")
df2.cache
df2.createOrReplaceTempView("trans_0")
# Loop through the stuff based on traversing the list elements until exhaustion of data, and, write to dynamically named TempViews.
# May need to checkpoint? Depends on depth of chain of linked items.
# Spark not well suited to this type of processing.
dfX_cnt = 1
cnt = 1
while (dfX_cnt != 0):
tabname_prev = 'trans_' + str(cnt-1)
tabname = 'trans_' + str(cnt)
query = "select t2.current_id, t2.previous_id, t1.start_date from {} t1, trans t2 where t1.current_id = t2.previous_id".format(tabname_prev)
dfX = spark.sql(query)
dfX.cache
dfX_cnt = dfX.count()
if (dfX_cnt!=0):
#print('Looping for dynamic creation of TempViews')
dfX.createOrReplaceTempView(tabname)
cnt=cnt+1
# Reduce the TempViews all to one DF. Can reduce an array of DF's as well, but could not find my notes here in this regard.
# Will memory errors occur?
from pyspark.sql.types import *
fields = [StructField('current_id', LongType(), False),
StructField('previous_id', LongType(), True),
StructField('start_date', StringType(), False)]
schema = StructType(fields)
dfZ = spark.createDataFrame(sc.emptyRDD(), schema)
for i in range(0,cnt,1):
tabname = 'trans_' + str(i)
query = "select * from {}".format(tabname)
df = spark.sql(query)
dfZ = dfZ.union(df)
# Show final results.
dfZ.select('current_id', 'start_date').sort(col('current_id')).show()
возвращает:
+----------+----------+
|current_id|start_date|
+----------+----------+
| 100|2001/01/01|
| 200|2001/01/01|
| 300|2001/01/01|
| 400|2005/01/01|
| 500|2005/01/01|
| 600|2001/01/01|
| 700|2001/01/01|
| 800|2009/02/02|
| 900|2009/02/02|
+----------+----------+