Как использовать полностью сформированный SQL с искровой структурированной потоковой передачей - PullRequest
1 голос
/ 14 апреля 2019

Документация для структурированной потоковой передачи Spark гласит, что - начиная с версии 2.3 все методы в контексте spark, доступные для static DataFrame / DataSet, также доступны для использования со структурированной потоковой передачей DataFrame / DataSet также. Однако мне еще не приходилось сталкиваться с любыми примерами того же самого.

Использование полностью сформированных SQL более гибко, выразительно и продуктивно для меня, чем DSL. Кроме того, в моем случае эти SQL уже разработаны и хорошо протестированы для статических версий. должен быть некоторым переделкой - в частности, чтобы использовать join s вместо correlated subqueries. Однако все еще есть большая ценность в сохранении общей полнотелой структуры sql.

Формат, который я собираюсь использовать, похож на это гипотетическое соединение:

 val tabaDf = spark.readStream(..)
 val tabbDf = spark.readStream(..)

 val joinSql = """select a.*, 
                  b.productName 
                  from taba
                  join tabb 
                  on a.productId = b.productId
                  where ..
                  group by ..
                  having ..
                  order by .."""
 val joinedStreamingDf = spark.sql(joinSql)

Есть пара вещей, которые не совсем понятно, как это сделать:

  • Должны ли tabaDf и tabbDf быть определены через spark.readStream: это мое предположение

  • Как объявить taba и tabb. Пытаюсь использовать

    tabaDf.createOrReplaceTempView("taba")
    tabbDf.createOrReplaceTempView("tabb")
    

    Результаты в

    WARN ObjectStore: не удалось получить базу данных global_temp, возвращая NoSuchObjectException

Все примеры, которые я мог найти, используют DSL и / или selectExpr() - как следующие https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

 df.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")

или используя select:

sightingLoc
  .groupBy("zip_code", window("start_time", "1 hour"))
  .count()
  .select( 
    to_json(struct("zip_code", "window")).alias("key"),
    col("count").cast("string").alias("value")) 

Это действительно единственные варианты - так что документация, в которой говорится, что все методы, поддерживаемые в static массиве данных / наборах данных, не совсем точны? В противном случае: a Любые указатели о том, как исправить вышеупомянутые проблемы и использовать прямую sql с потоковой передачей будет приветствоваться.

1 Ответ

3 голосов
/ 14 апреля 2019

Потоки должны быть зарегистрированы как временные представления, используя createOrReplaceTempView.AFAIK createOrReplaceView не является частью Spark API (возможно, у вас есть что-то, что обеспечивает неявное преобразование в класс с помощью такого метода).

spark.readStream(..).createOrReplaceTempView("taba")
spark.readStream(..).createOrReplaceTempView("tabb")

Теперь к представлениям можно обращаться, используя чистый SQL.Например, чтобы напечатать вывод на консоль:

spark
  .sql(joinSql)
  .writeStream
  .format("console")
  .start()
  .awaitTermination()

Редактировать: После редактирования вопроса я не вижу ничего плохого в вашем коде.Вот минимальный рабочий пример.Предполагая, что тестовый файл /tmp/foo/foo.csv

"a",1
"b",2
import org.apache.spark.sql.types._
val schema = StructType(Array(StructField("s", StringType), StructField("i", IntegerType)))
spark.readStream
  .schema(schema)
  .csv("/tmp/foo")
  .createOrReplaceTempView("df1")
spark.readStream
  .schema(schema)
  .csv("/tmp/foo")
  .createOrReplaceTempView("df2")

spark.sql("SELECT * FROM df1 JOIN df2 USING (s)")
  .writeStream
  .format("console")
  .start()
  .awaitTermination()

выводит

-------------------------------------------
Batch: 0
-------------------------------------------
+---+---+---+
|  s|  i|  i|
+---+---+---+
|  b|  2|  2|
|  a|  1|  1|
+---+---+---+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...