как управлять офсетным считыванием из кафки со искровым структурированным потоком - PullRequest
0 голосов
/ 15 мая 2019

У меня задание со структурированной потоковой передачей, для которого нужно прочитать данные из раздела kafka и выполнить некоторую агрегацию.Работу нужно было перезапускать ежедневно, но при ее перезапуске, если я установлю startingOffsets="latest", я потеряю данные, поступающие между перезапусками.Если я установлю startingOffsets="earliest", то задание будет читать все данные из темы, но не из того места, где осталось последнее потоковое задание.Может кто-нибудь помочь мне, как настроить, чтобы установить смещение вправо, где осталось последнее потоковое задание?

Я использую Spark 2.4.0 и kafka 2.1.1 ,Я попытался установить местоположение контрольной точки для задания записи, но кажется, что Spark не проверяет смещение сообщения kafka, поэтому он продолжает проверять последнее смещение или первое смещение в зависимости от startOffsets.

Вот конфигурациядля моей искры читать из kafka:

val df = spark.readStream
         .format("kafka")
         .option("kafka.bootstrap.servers", host)
         .option("subscribe", topic)
         .option("startingOffsets", offset)
         .option("enable.auto.commit", "false")
         .load()

с примером, что тема kafka имеет 10 сообщений со смещением от 1 до 10, спарк только что завершил обработку сообщения 5 и затем перезапустил.Как заставить спарк продолжать читать из сообщения 5, а не из 1 или 11?

1 Ответ

0 голосов
/ 15 мая 2019

Кажется, что с некоторым кодом я могу взять смещение, которое мне нужно, и сохранить его в каком-нибудь надежном хранилище, таком как cassandra. Затем, когда начинается потоковая передача искры, мне просто нужно прочитать сохраненное смещение и заполнить его начальными значениями. Это коды, которые помогают мне получить смещение, которое мне нужно

spark.streams.addListener(new StreamingQueryListener() {
         override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
            println("Query started:" + queryStarted.id)
         }

         override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
            println("Query terminated" + queryTerminated.id)
         }

         override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
            println("Query made progress")
            println("Starting offset:" + queryProgress.progress.sources(0).startOffset)
            println("Ending offset:" + queryProgress.progress.sources(0).endOffset)
            //Logic to save these offsets
            // the logic to save the offset write in here
         }
      })
...