Как загрузить все записи, которые уже были опубликованы с Kafka? - PullRequest
0 голосов
/ 05 марта 2019

У меня есть приложение для потокового Python со структурой pyspark, настроенное так:

from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("data streaming app")\
    .getOrCreate()


data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .load()

query = data_raw.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

И все, что появляется, это

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+

19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
  "id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
  "runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
  "name" : null,
  "timestamp" : "2019-03-04T22:00:49.389Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 852,
    "getBatch" : 180,
    "getOffset" : 135,
    "queryPlanning" : 107,
    "triggerExecution" : 1321,
    "walCommit" : 27
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[my_topic]]",
    "startOffset" : null,
    "endOffset" : {
      "my_topic" : {
        "0" : 303
      }
    },
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
  }
}

Как видите, my_topic имеетТам 303 сообщения, но я не могу их показать.Дополнительная информация включает в себя то, что я использую сливной коннектор Kafka JDBC для запроса базы данных оракула и сохранения строк в теме кафки.У меня есть настройки реестра Авро схема с этим.При необходимости я также поделюсь этими файлами свойств.

Кто-нибудь знает, что происходит?

1 Ответ

0 голосов
/ 05 марта 2019

Как потоковое приложение, эта потоковая структура Spark читает только сообщения, как только они опубликованы.Что я хотел сделать, для целей тестирования было прочитано все в теме.Для этого все, что вам нужно сделать, это дополнительная опция в readStream, то есть option("startingOffsets", "earliest").

data_raw = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "my_topic")\
    .option("startingOffsets", "earliest")
    .load()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...