Как получить доступ к кадру данных, возвращенному UDF внутри столбца в Spark Streaming - PullRequest
2 голосов
/ 27 января 2020

Я пишу код для определения шага потоковой передачи данных с датчика. Для этого я разделяю все входящие значения в качестве параметров датчика. После этого я обнаруживаю шаги с помощью UDF. Внутри этого UDF я создаю диктат для хранения двух списков важных временных меток на определенный шаг.

@udf(StructType())
def detect_steps(timestamp, x, y, z):
   .....   
   d = dict()
   d['timestamp_ic'] = times_ic
   d['timestamp_to'] = timestamp_to
   ......
   return d

В основной функции Spark я создал фрейм данных, который вычисляет все эти шаги в скользящем окне следующим образом:

stepData = LLLData \
        .withWatermark("time", "10 seconds") \
        .groupBy(
            window("time", windowDuration="5 seconds",slideDuration="1 second"),
            "sensor"
        ) \
        .agg(
            collect_list("time").alias("time_window"),
            collect_list(sensorData.Acceleration_x).alias("Acceleration_x_window"),
            collect_list(sensorData.Acceleration_y).alias("Acceleration_y_window"),
            collect_list(sensorData.Acceleration_z).alias("Acceleration_z_window"),           
        ) \
        .select(
            "window",
            "sensor",
            detect_steps("time_window", "Acceleration_x_window", "Acceleration_y_window", "Acceleration_z_window")
        ) 

Теперь, когда я печатаю схему df, она выглядит следующим образом this:

 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- sensor: string (nullable = true)
 |-- detect_steps("time_window", "Acceleration_x_window", "Acceleration_y_window", "Acceleration_z_window"): string (nullable = true)

Пока я хочу это:

 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- sensor: string (nullable = true)
 |-- timestamp_ic: string (nullable = true)
 |-- timestamp_to: string (nullable = true)

Однако я не могу выполнить оператор выбора для столбца UDF в stepData, ошибка: Column is not iterable.

Когда я пытаюсь впоследствии изменить кадр данных root, например, путем синтаксического разбора столбца i c на кадр данных искры, например, так: df_stepData = spark.createDataFrame(data=stepData.select("ic")) это дает мне TypeError: data is already a DataFrame. Однако, если посмотреть на схему данных, ic набирается как строка. Я также пытался прочитать ic как json файл, но это выдает следующую ошибку: TypeError: path can be only string, list or RDD

Я мог решить проблему, дважды вызвав detect_steps UDF, первый возвращение timestamp_ic и второе возвращение timestamp_to, чтобы получить два столбца, но я уверен, что есть лучший, более эффективный способ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...