Смещения, зафиксированные не по порядку, с потоковым приемником Hive Spark DataSource API V2 - PullRequest
0 голосов
/ 23 мая 2018

Я использую приемник для сохранения Spark (2.3) Структурированного потокового фрейма данных в таблицу Hive с пользовательской реализацией приемника .

Код выглядит следующим образом.

val df = spark.readStream.format("socket").option("host", "localhost").option("port", 19191).load().as[String]


val query = df.map { s => val records = s.split(",") assert(records.length >= 4)
        (records(0).toInt, records(1), records(2), records(3))
     }


query.selectExpr("_1 as eid", "_2 as name", "_3 as salary", "_4 as designation").
      writeStream.
      format("hive-streaming").
      option("metastore", ".....").
      option("db", "test").
      option("table", "test_employee").
      option("checkpointLocation", "/checkpoints/employee/checkpoint").
      queryName("socket-hive-streaming").
      start()

Это может привести к следующей ошибке во время выполнения.

ERROR streaming.MicroBatchExecution: Query socket-hive-streaming [id =  ......, runId = ......] terminated with error
java.lang.RuntimeException: Offsets committed out of order: 1 followed by 0
        at scala.sys.package$.error(package.scala:27)
        at org.apache.spark.sql.execution.streaming.TextSocketSource.commit(socket.scala:146)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:356)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$4.apply(MicroBatchExecution.scala:355)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:355)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)

1 Ответ

0 голосов
/ 14 августа 2018

Существует два способа решения вашей проблемы:

  1. Удалить / очистить контрольную точку: /checkpoints/employee/checkpoint на вашем компьютере

  2. ИспользованиеДругой источник, который поддерживает смещение, например, Кафка

Причина, по которой вы столкнулись с этой проблемой, заключается в том, что сокет не поддерживает информацию о смещении .

КогдаВы перезапускаете свою работу, которая получает входные данные от socket 9999, первое, что делает ваша работа, пытается восстановить состояние из /checkpoints/employee/checkpoint, и обнаруживает, что ваше текущее смещение, которое было записано, составляет 1.Затем вы вводите другие сообщения в socket 9999, ваша работа обнаруживает, что смещение от socket 9999 составляет 0.Так что это исключение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...