Я пытаюсь читать от брокера Kafka с потоковой передачей искр, но столкнулся с некоторыми проблемами.
def spark_streaming_from_STABLE_kafka_topic():
conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
topic = "stable_topic"
kvs = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": "my-broker",
"auto.offset.reset": "smallest"},
keyDecoder=lambda x: x,
valueDecoder=lambda x: x
)
lines = kvs.window(2, 2).map(lambda x: x[1])
lines.pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
ssc.start()
ssc.awaitTermination()
Приведенный выше код не извлекает ничего, кроме пустых пакетов:
-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------
-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------
Topi c stable_topic
содержит фиксированный размер данных. Это не меняется. У меня еще один топи c, который каждую секунду получает данные. Если я использую этот topi c вместо stable_topic
и удаляю "auto.offset.reset": "smallest"
, тогда код извлекает данные.
Я предполагаю, что что-то не так с {"auto.offset.reset": "smallest"}
, но я не могу этого понять.
Кто-нибудь теперь, что я делаю неправильно?