Я пытаюсь реализовать fbprophet с pyspark, но не могу парализовать код на всех доступных ядрах (запускать его локально на моей машине).
Я уже искал множество статей, пытаясьпонять, почему это происходит.
Ниже вы можете найти кусок кода, где должно происходить распараллеливание.Я уже определил все отображенные функции
if __name__ == '__main__':
conf = (SparkConf()
.setMaster("local[*]")
.setAppName("SparkFBProphet Example"))
spark = (SparkSession
.builder
.config(conf=conf)
.getOrCreate())
# Removes some of the logging after session creation so we can still see output
# Doesnt remove logs before/during session creation
# To edit more logging you will need to set in log4j.properties on cluster
sc = spark.sparkContext
sc.setLogLevel("ERROR")
# Retrieve data from local csv datastore
print(compiling_pickle())
df = retrieve_data()
# Group data by app and metric_type to aggregate data for each app-metric combo
df = df.groupBy('column1', 'column2')
df = df.agg(collect_list(struct('ds', 'y')).alias('data'))
df = (df.rdd
.map(lambda r: transform_data(r))
.map(lambda d: partition_data(d))
.map(lambda d: create_model(d))
.map(lambda d: train_model(d))
.map(lambda d: make_forecast(d))
.map(lambda d: imp_predictions(d))
.saveAsTextFile("../data_spark_t/results"))
spark.stop()
В этом разделе:
print(compiling_pickle())
df = retrieve_data()
Загрузка загружена, скомпилирована и генерируется CSV.С помощью функции извлечения я делаю только это:
df = (spark.read.option("header", "true")
.option("inferSchema", value=True)
.csv("../data_spark_t/database_created.csv"))
Итак, при всем этом я не понимаю, почему мой код не присоединяет все доступные ядра к выполнению.
Просто чтобы указатьнекоторые уже проверенные точки:
Мой номер разделения равен 500. Я уже установил его равным количеству строк в df (после 'collect_list'), но не сработал;
Все возможные комбинации для setMaster () были реализованы;
Кто-нибудь может помочь?