Читайте из кафки с помощью pyspark - PullRequest
0 голосов
/ 29 мая 2020

Я пытаюсь читать из kafka topi c, но сталкиваюсь с некоторыми проблемами.

def spark_streaming_from_STABLE_kafka_topic():
    conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
    sc = SparkContext(conf=conf) 
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 2)

     kafka_params = {
    "bootstrap.servers": "my-kafka-broker",
    "group.id": "new",
    "auto.offset.reset": "smallest",
    }

topic = "stable_topic"
kvs = KafkaUtils.createDirectStream(ssc,
                                    [topic],
                                    kafka_params,
                                    keyDecoder=lambda x: x,
                                    valueDecoder=lambda x: x
                                    )

    lines = kvs.window(2, 2).map(lambda x: x[1])
    lines.pprint()
    return ssc


if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
    ssc.start()
    ssc.awaitTermination()

Topi c stable_topic содержит такие записи, как:

stable_topic:0:15708: key=None value=S'2020-03-01 01:10:50,John,32\n'
p1
.
stable_topic:0:15709: key=None value=S'2020-03-01 01:10:50,Peter,27\n'
p1
.

Значит, это значение String. Не могу прочитать записи с valueDecoder=lambda x: x Нужен ли мне специальный декодер?

...