Я пытаюсь организовать пару преобразований данных, которые выполняются в pyspark. У меня есть код, подобный этому ниже.
def main():
spark_session = SparkSession\
.builder\
.appName(config.SPARK_CONFIG['AppName']) \
.getOrCreate()
data = getData(spark_session)
analytics = Analytics(data)
analytics.execute_and_save_analytics()
spark_session.stop()
def getData(spark_session):
sqlContext = pyspark.SQLContext(spark_session.sparkContext)
return sqlContext.read.option('user', user).option('password', pswd)\
.jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
+ ';database=' + database, table)
class Analytics():
def __init__(self, df):
self.df = df
def _execute(self):
df0 = self.df.withColumn('col3', df.col31 + df.col32)
# df0.persist()
df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
return df1, df2
def execute_and_save_analytics(self):
output_df1, output_df2 = self._execute()
output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
output_df2.coalesce(1).write.csv('/path/file.csv', header='true')
Как я могу реорганизовать код таким образом, чтобы df0 оценивался только один раз? Я пытался использовать persist () как в закомментированной строке, но улучшения производительности не было. Какие-нибудь мысли?
Другая, но похожая проблема, как бы вы организовали свои конвейеры, если бы у вас был не один _execute (), а много похожих методов _execute1 (), _execute2 () и т. Д.
Я полагаю, если я вызову методы _execute () отдельно, то PySpark будет оценивать каждый конвейер преобразований отдельно (?), Поэтому я теряю производительность.
edit: данные преобразования (filter, groupBy, count) являются только примерами, я ищу решение, работающее с преобразованиями любого типа или определением col3.
edit2: кажется, что вызов cache () в init - лучшая оптимизация.