Завершение очереди структурированного потока с использованием блоков данных - PullRequest
0 голосов
/ 03 апреля 2020

Я хотел бы понять, означает ли выполнение ячейки в записной книжке Databricks с приведенным ниже кодом и ее отмену означающим, что чтение потока закончено. Или, может быть, это требует какого-то явного закрытия?

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .option("subscribe", "topic1")
  .load()

display(df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)])

1 Ответ

1 голос
/ 04 апреля 2020

Режим без отображения

Лучше всего выполнить эту команду в ячейке:

streamingQuery.stop()

для этого типа подхода:

 val streamingQuery = streamingDF                // Start with our "streaming" DataFrame
  .writeStream                                  // Get the DataStreamWriter
  .queryName(myStreamName)                      // Name the query
  .trigger(Trigger.ProcessingTime("3 seconds")) // Configure for a 3-second micro-batch
  .format("parquet")                            // Specify the sink type, a Parquet file
  .option("checkpointLocation", checkpointPath) // Specify the location of checkpoint files & W-A logs
  .outputMode("append")                         // Write only new data to the "file"
  .start(outputPathDir)         

В противном случае он продолжает работать, что является идеей потоковой передачи.

Я бы не остановил кластер, так как тогда все потоки.

Режим отображения блоков данных

DataBricks написали хороший набор утилит, но вам нужно пройти курс, чтобы получить их. Моя глупость.

display - это штука с данными. Нужно отформатировать как:

 display(myDF, streamName = "myQuery")

, а затем продолжить в отдельной ячейке следующим образом:

println("Looking for %s".format(myStreamName))

for (stream <- spark.streams.active)       // Loop over all active streams
    if (stream.name == myStreamName)         // Single out your stream
       {val s = spark.streams.get(stream.id)
        s.stop()
       }

Это остановит подход отображения, который является записью в приемник памяти.

...