Искра |pyspark с fbprophet - параллельная обработка не работает с rdd.map - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь реализовать fbprophet с pyspark, но не могу парализовать код на всех доступных ядрах (запускать его локально на моей машине).

Я уже искал множество статей, пытаясьпонять, почему это происходит.

Ниже вы можете найти кусок кода, где должно происходить распараллеливание.Я уже определил все отображенные функции

if __name__ == '__main__':

conf = (SparkConf()
        .setMaster("local[*]")
        .setAppName("SparkFBProphet Example"))

spark = (SparkSession
         .builder
         .config(conf=conf)
         .getOrCreate())

# Removes some of the logging after session creation so we can still see output
# Doesnt remove logs before/during session creation
# To edit more logging you will need to set in log4j.properties on cluster
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# Retrieve data from local csv datastore
print(compiling_pickle())
df = retrieve_data()

# Group data by app and metric_type to aggregate data for each app-metric combo
df = df.groupBy('column1', 'column2')
df = df.agg(collect_list(struct('ds', 'y')).alias('data'))


df = (df.rdd
      .map(lambda r: transform_data(r))
      .map(lambda d: partition_data(d))
      .map(lambda d: create_model(d))
      .map(lambda d: train_model(d))
      .map(lambda d: make_forecast(d))
      .map(lambda d: imp_predictions(d))
      .saveAsTextFile("../data_spark_t/results"))

spark.stop()

В этом разделе:

print(compiling_pickle())
df = retrieve_data()

Загрузка загружена, скомпилирована и генерируется CSV.С помощью функции извлечения я делаю только это:

df = (spark.read.option("header", "true")
      .option("inferSchema", value=True)
      .csv("../data_spark_t/database_created.csv"))

Итак, при всем этом я не понимаю, почему мой код не присоединяет все доступные ядра к выполнению.

Просто чтобы указатьнекоторые уже проверенные точки:

  • Мой номер разделения равен 500. Я уже установил его равным количеству строк в df (после 'collect_list'), но не сработал;

  • Все возможные комбинации для setMaster () были реализованы;

Кто-нибудь может помочь?

1 Ответ

0 голосов
/ 04 декабря 2018

Проблема решена:

schema = StructType([
    StructField("column 1", StringType(), True),
    StructField("column 2", StringType(), True),
    StructField("column 3", TimestampType(), True),
    StructField("yhat", FloatType(), True),
    StructField("yhat_lower", FloatType(), True),
    StructField("yhat_upper", FloatType(), True),
])

df = spark.createDataFrame(df, schema)
df.write.options(header=True).csv(
    'dbfs:/mnt/location/output_teste_1', mode='overwrite')

Просто необходимо сохранить с помощью описанной выше структуры.

Реализовано это в блоках данных Azure, и код добился цели, запустив все доступные узлы.

...