Использование смещения хранилища KafkaUtils.createDirectStream в HDFS
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print("topic: %s\n partition: %s\n fromOffset: %s\n untilOffset: %s\n" % (o.topic, o.partition, o.fromOffset, o.untilOffset))
смещение, как:
OffsetRange(topic: pdns, partition: 21, range: [248025782 -> 248025782]
topic: pdns
partition: 21
fromOffset: 248025782
untilOffset: 248025782
OffsetRange(topic: pdns, partition: 4, range: [248016485 -> 248016485]
topic: pdns
partition: 4
fromOffset: 248016485
untilOffset: 248016485
OffsetRange(topic: pdns, partition: 9, range: [247995083 -> 247995083]
topic: pdns
partition: 9
fromOffset: 247995083
untilOffset: 247995083
Я хочу установить offoffset в KafkaUtils.createDirectStream, используя offsetRange, как это сделать, когда Kafka является кластером
direct_kafka_stream = KafkaUtils.createDirectStream(
ssc=ssc,
topics=topic_name,
kafkaParams={
"metadata.broker.list": brokers,
"group.id": consumer_id
},
fromOffsets=
)
ssc=ssc,
topics=topic_name,
kafkaParams={
"metadata.broker.list": brokers,
"group.id": consumer_id
},
fromOffsets=
)