Почему потоковый запрос с источником сокета и несколькими приемниками не работает? - PullRequest
0 голосов
/ 02 января 2019

Я пытаюсь использовать несколько запросов для записи в разные приемники в спарк.Первый запрос работает, а выходные данные записываются в приемник, а второй - нет.

Кто-нибудь может указать, в чем моя ошибка.

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._
val source = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
  .as[String]
  .map {e =>
    println(e)
    e
  }

// With Multiple Queries
val q1 = source.writeStream.outputMode("append").format("console")
  .trigger(Trigger.ProcessingTime(1000))
  .start()
println(q1)

val q2 = source.writeStream.outputMode("append")
  .format("csv")
  .option("path", "output.csv")
  .option("checkpointLocation", "/tmp/checkpoint/test")
  .trigger(Trigger.ProcessingTime(1000))
  .start()
println(q2)

spark.streams.awaitAnyTermination()

Консольный приемник работает, но приемник CSVне пишет вывод.Если я меняю порядок, то csv раковина работает, но не консоль.

1 Ответ

0 голосов
/ 02 января 2019

Я предполагаю, что вы используете netcat или аналогичную утилиту для получения данных.Такие утилиты не предназначены для воспроизведения и не обеспечивают постоянный уровень, поэтому в результате данные необратимо уничтожаются после использования.

Поэтому второй поток будет прослушивать изменения, но никакие данные никогда не достигнут этого.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...