Как оцениваются преобразования pyspark в одном методе? - PullRequest
0 голосов
/ 02 мая 2018

Я пытаюсь организовать пару преобразований данных, которые выполняются в pyspark. У меня есть код, подобный этому ниже.

def main():
    spark_session = SparkSession\
        .builder\
        .appName(config.SPARK_CONFIG['AppName']) \
        .getOrCreate()
    data = getData(spark_session)

    analytics = Analytics(data)
    analytics.execute_and_save_analytics()

    spark_session.stop()


def getData(spark_session):    
    sqlContext = pyspark.SQLContext(spark_session.sparkContext)
    return sqlContext.read.option('user', user).option('password', pswd)\
        .jdbc('jdbc:sqlserver://' + sqlserver + ':' + port\
        + ';database=' + database, table)


class Analytics():
    def __init__(self, df):
        self.df = df

    def _execute(self):
        df0 = self.df.withColumn('col3', df.col31 + df.col32)
        # df0.persist()
        df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
        df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
        return df1, df2

    def execute_and_save_analytics(self):
        output_df1, output_df2 = self._execute()
        output_df1.coalesce(1).write.csv('/path/file.csv', header='true')
        output_df2.coalesce(1).write.csv('/path/file.csv', header='true')

Как я могу реорганизовать код таким образом, чтобы df0 оценивался только один раз? Я пытался использовать persist () как в закомментированной строке, но улучшения производительности не было. Какие-нибудь мысли?

Другая, но похожая проблема, как бы вы организовали свои конвейеры, если бы у вас был не один _execute (), а много похожих методов _execute1 (), _execute2 () и т. Д. Я полагаю, если я вызову методы _execute () отдельно, то PySpark будет оценивать каждый конвейер преобразований отдельно (?), Поэтому я теряю производительность.

edit: данные преобразования (filter, groupBy, count) являются только примерами, я ищу решение, работающее с преобразованиями любого типа или определением col3.

edit2: кажется, что вызов cache () в init - лучшая оптимизация.

1 Ответ

0 голосов
/ 02 мая 2018

Как есть (с persist закомментированным) df0 все равно будет оцениваться дважды. Структура вашего кода никак не повлияет.

Разделение вашего кода на

def _execute_1(self):
    df0 = self.df.withColumn('col3', df.col31 + df.col32)
    df1 = df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()
    return df1

def _execute_2(self):
    df0 = self.df.withColumn('col3', df.col31 + df.col32)
    df2 = df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()
    return df2

не будет иметь никакого значения. Не вдаваясь в подробности о cache, вы можете:

def __init__(self, df):
    self.df = df.withColumn('col3', df.col31 + df.col32).cache()

def _execute_1(self):
    return df0.filter(df.col3 > 10).groupBy('col1', 'col2').count()

def _execute_2(self):
    return df0.filter(df.col3 < 10).groupBy('col1', 'col2').count()

def execute_and_save_analytics(self):
    output_df1 = self._execute_1()
    output_df2 = self._execute_2()
    output_df1.coalesce(1).write.csv('/path/file1.csv', header='true')
    output_df2.coalesce(1).write.csv('/path/file2.csv', header='true')
    self.df.unpersist()

но может быть проще просто:

(self.df
  .withColumn('col3', df.col31 + df.col32 > 10)
  .repartition("col3")
  .write.partitionBy("col3")
  .write.csv('/path/file.csv', header='true'))
...