Конкатенация структурированного потокового фрейма данных Spark со статическим фреймом данных c - PullRequest
0 голосов
/ 25 марта 2020

У меня есть задание структурированной потоковой передачи Spark, которое читает файлы CSV, выполняет некоторые вычисления и выводит текстовый файл для использования в модели ниже по течению. Вывод представляет собой один столбец, составленный из исходных столбцов, которые были объединены (разделены пробелом). Например:

1556951121 7.19 26.6 36.144 14.7402 1
1556951122 7.59 27.1 37.697 14.7402 1
1556951123 8.01 27.7 39.328 14.7403 0
etc.

Для последующей модели требуется некоторая дополнительная информация заголовка в верхней части файла. Ему нужно имя файла в первой строке и количество столбцов во второй строке. Например:

filename
6
1556951121 7.19 26.6 36.144 14.7402 1
1556951122 7.59 27.1 37.697 14.7402 1
1556951123 8.01 27.7 39.328 14.7403 0
etc.

Можно ли это сделать в Spark? Я создал информацию заголовка как отдельный фрейм данных:

header = [('filename',), ('6',)]
rdd = sparkSession.sparkContext.parallelize(header)
headerDF = sparkSession.createDataFrame(rdd, schema=StructType([StructField('values', StringType(), False)]))

Я пробовал union, но объединение между потоковым и стати c фреймом данных не поддерживается.

I также посмотрел на join, но я не думаю, что это даст мне то, что мне нужно, поскольку это добавит дополнительный столбец.

Для информации, это выходной запрос:

df.coalesce(1)\
  .writeStream\
  .outputMode("append")\
  .format("text")\
  .option("checkpointLocation", checkpoint_path)\
  .option("path", path)\
  .start()\
  .awaitTermination()

и это входной источник:

df = sparkSession.readStream\
                 .option("header", "true")\
                 .option("maxFilesPerTrigger", 1)\
                 .schema(schema)\
                 .csv(input_path)

Входные CSV просто состоят из метки времени и некоторых значений датчика. Например:

Timestamp,Sensor1,Sensor2,Sensor3,Sensor4,Sensor5
1556951121,7.19,26.6,36.144,14.7402,True
1556951122,7.59,27.1,37.697,14.7402,True
1556951123,8.01,27.7,39.328,14.7403,False

1 Ответ

0 голосов
/ 31 марта 2020

В конце я использовал приемник foreachBatch, так как это дает вам c кадр данных, который вы можете затем присоединить / объединить с другими кадрами данных:

df.coalesce(1).writeStream.foreachBatch(foreach_batch_function).start()

И пакетная функция foreach:

def foreach_batch_function(df, epoch_id):
     complete_df = headerDF.union(df)
     complete_df.coalesce(1).write.text(os.path.join(output_path, str(epoch_id)))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...