Я пробираюсь через Spark: полное руководство и пытаюсь протестировать API структурированной потоковой передачи, описанный в главе 3. Я запускаю spark-shell
в локальном режиме и запускаю следующие команды для имитациипоток CSV-файлов.
import org.apache.spark.sql.functions.window
spark.conf.set("spark.sql.shuffle.partitions", "5")
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/by-day/*.csv")
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("/data/retail-data/by-day/*.csv")
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
$"CustomerId", window($"InvoiceDate", "1 day"))
.sum("total_cost")
Затем я запускаю потоковое задание, выполнив
purchaseByCustomerPerHour.writeStream
.format("memory") // memory = store in-memory table
.queryName("customer_purchases") // the name of the in-memory table
.outputMode("complete") // complete = all the counts should be in the table
.start()
. На этом этапе консоль начинает регистрировать номер задачи, а также количество произведенных файлов.по течению каждые 2-3 секунды.Кажется, этот процесс продолжается бесконечно.
Теперь я хотел бы выполнить следующий запрос к запущенному потоку.
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")
.show(5)
К сожалению, я не могу понять, как подключить вторую spark-shell
экземпляр существующего искрового контекста, содержащий процесс потоковой передачи и таблицу / представление customer_purchases.Это возможно?