Документация для структурированной потоковой передачи Spark гласит, что - начиная с версии 2.3 все методы в контексте spark, доступные для static DataFrame
/ DataSet
, также доступны для использования со структурированной потоковой передачей DataFrame
/ DataSet
также. Однако мне еще не приходилось сталкиваться с любыми примерами того же самого.
Использование полностью сформированных SQL более гибко, выразительно и продуктивно для меня, чем DSL
. Кроме того, в моем случае эти SQL уже разработаны и хорошо протестированы для статических версий. должен быть некоторым переделкой - в частности, чтобы использовать join
s вместо correlated subqueries
. Однако все еще есть большая ценность в сохранении общей полнотелой структуры sql.
Формат, который я собираюсь использовать, похож на это гипотетическое соединение:
val tabaDf = spark.readStream(..)
val tabbDf = spark.readStream(..)
val joinSql = """select a.*,
b.productName
from taba
join tabb
on a.productId = b.productId
where ..
group by ..
having ..
order by .."""
val joinedStreamingDf = spark.sql(joinSql)
Есть пара вещей, которые не совсем понятно, как это сделать:
Должны ли tabaDf
и tabbDf
быть определены через spark.readStream
: это мое предположение
Как объявить taba
и tabb
. Пытаюсь использовать
tabaDf.createOrReplaceTempView("taba")
tabbDf.createOrReplaceTempView("tabb")
Результаты в
WARN ObjectStore: не удалось получить базу данных global_temp, возвращая NoSuchObjectException
Все примеры, которые я мог найти, используют DSL
и / или selectExpr()
- как следующие https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")
или используя select
:
sightingLoc
.groupBy("zip_code", window("start_time", "1 hour"))
.count()
.select(
to_json(struct("zip_code", "window")).alias("key"),
col("count").cast("string").alias("value"))
Это действительно единственные варианты - так что документация, в которой говорится, что все методы, поддерживаемые в static
массиве данных / наборах данных, не совсем точны? В противном случае: a Любые указатели о том, как исправить вышеупомянутые проблемы и использовать прямую sql
с потоковой передачей будет приветствоваться.