Почему на предыдущих записях тем работает спарк стриминг? - PullRequest
0 голосов
/ 24 июня 2019

Я управлял брокером зоопарка и кафкой, но я не руководил производителем кафки.Я запустил код потоковой передачи и распечатал здесь нефильтрованный поток.Мой вопрос заключается в том, почему я получаю эти потоки данных, а именно:

{"vehicleId": "0", "lon": "0", "lat": "0", "ts ":" 0 "}

{" vehicleId ":" 0 "," lon ":" 0 "," lat ":" 0 "," ts ":" 0 "}

Хотя я не управляю продюсером?Что означают эти сообщения ниже?

19/06/24 20:20:00 INFO JobScheduler: Finished job streaming job 1561378800000 ms.0 from job set of time 1561378800000 ms
19/06/24 20:20:00 INFO JobScheduler: Total delay: 0.028 s for time 1561378800000 ms (execution: 0.021 s)
19/06/24 20:20:00 INFO MapPartitionsRDD: Removing RDD 161 from persistence list
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1716
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1893
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1944
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
{"vehicleId":"0","lon":"0","lat":"0","ts":"0"}
...

19/06/24 20:20:00 INFO KafkaRDD: Removing RDD 160 from persistence list
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1628
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1781
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1570
19/06/24 20:20:00 INFO BlockManager: Removing RDD 161
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1808
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 2020
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1624
19/06/24 20:20:00 INFO ContextCleaner: Cleaned accumulator 1918
19/06/24 20:20:00 INFO ContextCleaner: Cleaned

1 Ответ

1 голос
/ 25 июня 2019

Возможно, вы захотите проверить, что вы установили в 'auto.offset.reset'.

Из руководства по потоковой передаче Spark:

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

Они устанавливают сброс смещения на "самый последний".Ваш, кажется, настроен на самое раннее.

...