Я работаю с кодом, который ожидает поток данных, но мой исходный DF не является потоковым.
Потоковые кадры данных могут быть созданы методами, описанными здесь . Основной способ - использовать SparkSession.read ... (путь), однако я хотел бы создать потоковый DF из существующего json или не потокового объекта DF.
У меня есть HTTP-ответ который я конвертирую в DF следующим образом:
val df = spark.read.json(Seq(response.body).toDS)
(эквивалент не существует для spark.readStream)
Хакерским решением было бы просто сохранить json как затем загрузите его, используя SparkSession.readStream.json(path)
, но мне было интересно, есть ли более элегантное решение. В идеале что-то вроде val = spark.readStream.df(df)
или df.convertToStreaming()