Как создать DStream со смещением в PySpark (используя KafkaUtils.createDirectStream)? когда кафка это кластер? - PullRequest
0 голосов
/ 31 октября 2018

Использование смещения хранилища KafkaUtils.createDirectStream в HDFS

offsetRanges = []
def storeOffsetRanges(rdd):
    global offsetRanges
    offsetRanges = rdd.offsetRanges()
    return rdd

def printOffsetRanges(rdd):
    for o in offsetRanges:
        print("topic: %s\n partition: %s\n fromOffset: %s\n untilOffset: %s\n" % (o.topic, o.partition, o.fromOffset, o.untilOffset))

смещение, как:

OffsetRange(topic: pdns, partition: 21, range: [248025782 -> 248025782]
topic: pdns
 partition: 21
 fromOffset: 248025782
 untilOffset: 248025782

OffsetRange(topic: pdns, partition: 4, range: [248016485 -> 248016485]
topic: pdns
 partition: 4
 fromOffset: 248016485
 untilOffset: 248016485

OffsetRange(topic: pdns, partition: 9, range: [247995083 -> 247995083]
topic: pdns
 partition: 9
 fromOffset: 247995083
 untilOffset: 247995083

Я хочу установить offoffset в KafkaUtils.createDirectStream, используя offsetRange, как это сделать, когда Kafka является кластером

direct_kafka_stream = KafkaUtils.createDirectStream(
        ssc=ssc,
        topics=topic_name,
        kafkaParams={
            "metadata.broker.list": brokers,
            "group.id": consumer_id
        },
        fromOffsets= 
    )

  ssc=ssc,
        topics=topic_name,
        kafkaParams={
            "metadata.broker.list": brokers,
            "group.id": consumer_id
        },
        fromOffsets= 
    )
...