Я пишу код для определения шага потоковой передачи данных с датчика. Для этого я разделяю все входящие значения в качестве параметров датчика. После этого я обнаруживаю шаги с помощью UDF. Внутри этого UDF я создаю диктат для хранения двух списков важных временных меток на определенный шаг.
@udf(StructType())
def detect_steps(timestamp, x, y, z):
.....
d = dict()
d['timestamp_ic'] = times_ic
d['timestamp_to'] = timestamp_to
......
return d
В основной функции Spark я создал фрейм данных, который вычисляет все эти шаги в скользящем окне следующим образом:
stepData = LLLData \
.withWatermark("time", "10 seconds") \
.groupBy(
window("time", windowDuration="5 seconds",slideDuration="1 second"),
"sensor"
) \
.agg(
collect_list("time").alias("time_window"),
collect_list(sensorData.Acceleration_x).alias("Acceleration_x_window"),
collect_list(sensorData.Acceleration_y).alias("Acceleration_y_window"),
collect_list(sensorData.Acceleration_z).alias("Acceleration_z_window"),
) \
.select(
"window",
"sensor",
detect_steps("time_window", "Acceleration_x_window", "Acceleration_y_window", "Acceleration_z_window")
)
Теперь, когда я печатаю схему df, она выглядит следующим образом this:
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- sensor: string (nullable = true)
|-- detect_steps("time_window", "Acceleration_x_window", "Acceleration_y_window", "Acceleration_z_window"): string (nullable = true)
Пока я хочу это:
|-- window: struct (nullable = true)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- sensor: string (nullable = true)
|-- timestamp_ic: string (nullable = true)
|-- timestamp_to: string (nullable = true)
Однако я не могу выполнить оператор выбора для столбца UDF в stepData, ошибка: Column is not iterable
.
Когда я пытаюсь впоследствии изменить кадр данных root, например, путем синтаксического разбора столбца i c на кадр данных искры, например, так: df_stepData = spark.createDataFrame(data=stepData.select("ic"))
это дает мне TypeError: data is already a DataFrame
. Однако, если посмотреть на схему данных, ic
набирается как строка. Я также пытался прочитать ic
как json файл, но это выдает следующую ошибку: TypeError: path can be only string, list or RDD
Я мог решить проблему, дважды вызвав detect_steps
UDF, первый возвращение timestamp_ic
и второе возвращение timestamp_to
, чтобы получить два столбца, но я уверен, что есть лучший, более эффективный способ.