У меня есть задание структурированной потоковой передачи Spark, которое читает файлы CSV, выполняет некоторые вычисления и выводит текстовый файл для использования в модели ниже по течению. Вывод представляет собой один столбец, составленный из исходных столбцов, которые были объединены (разделены пробелом). Например:
1556951121 7.19 26.6 36.144 14.7402 1
1556951122 7.59 27.1 37.697 14.7402 1
1556951123 8.01 27.7 39.328 14.7403 0
etc.
Для последующей модели требуется некоторая дополнительная информация заголовка в верхней части файла. Ему нужно имя файла в первой строке и количество столбцов во второй строке. Например:
filename
6
1556951121 7.19 26.6 36.144 14.7402 1
1556951122 7.59 27.1 37.697 14.7402 1
1556951123 8.01 27.7 39.328 14.7403 0
etc.
Можно ли это сделать в Spark? Я создал информацию заголовка как отдельный фрейм данных:
header = [('filename',), ('6',)]
rdd = sparkSession.sparkContext.parallelize(header)
headerDF = sparkSession.createDataFrame(rdd, schema=StructType([StructField('values', StringType(), False)]))
Я пробовал union
, но объединение между потоковым и стати c фреймом данных не поддерживается.
I также посмотрел на join
, но я не думаю, что это даст мне то, что мне нужно, поскольку это добавит дополнительный столбец.
Для информации, это выходной запрос:
df.coalesce(1)\
.writeStream\
.outputMode("append")\
.format("text")\
.option("checkpointLocation", checkpoint_path)\
.option("path", path)\
.start()\
.awaitTermination()
и это входной источник:
df = sparkSession.readStream\
.option("header", "true")\
.option("maxFilesPerTrigger", 1)\
.schema(schema)\
.csv(input_path)
Входные CSV просто состоят из метки времени и некоторых значений датчика. Например:
Timestamp,Sensor1,Sensor2,Sensor3,Sensor4,Sensor5
1556951121,7.19,26.6,36.144,14.7402,True
1556951122,7.59,27.1,37.697,14.7402,True
1556951123,8.01,27.7,39.328,14.7403,False