Kafka с проблемой потоковой передачи искры: не удается прочитать данные из topi c с существующими данными - PullRequest
0 голосов
/ 29 мая 2020

Я пытаюсь читать от брокера Kafka с потоковой передачей искр, но столкнулся с некоторыми проблемами.

def spark_streaming_from_STABLE_kafka_topic():
    conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
    sc = SparkContext(conf=conf) 
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 2)

    topic = "stable_topic"
    kvs = KafkaUtils.createDirectStream(ssc,
                                    [topic],
                                    {"metadata.broker.list": "my-broker",
                                    "auto.offset.reset": "smallest"},
                                    keyDecoder=lambda x: x,
                                    valueDecoder=lambda x: x
                                    )

    lines = kvs.window(2, 2).map(lambda x: x[1])
    lines.pprint()
    return ssc


if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
    ssc.start()
    ssc.awaitTermination()

Приведенный выше код не извлекает ничего, кроме пустых пакетов:

-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------

-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------

Topi c stable_topic содержит фиксированный размер данных. Это не меняется. У меня еще один топи c, который каждую секунду получает данные. Если я использую этот topi c вместо stable_topic и удаляю "auto.offset.reset": "smallest", тогда код извлекает данные.

Я предполагаю, что что-то не так с {"auto.offset.reset": "smallest"}, но я не могу этого понять.

Кто-нибудь теперь, что я делаю неправильно?

1 Ответ

1 голос
/ 29 мая 2020

В более поздних версиях smallest был заменен на earliest. Убедитесь, что вы проверили документацию по используемой вами версии.

Кроме того, конфигурация auto.offset.reset не вступит в силу, если группа потребителей уже потребляла некоторые данные из топи c stable_topic. Поэтому вы можете подумать об изменении group.id в своем потоковом задании.

Если вы назначаете новый group.id, не забудьте установить auto.offset.reset на smalles (или earliest в более новых версии).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...