Я решил эту проблему, вернувшись к библиотеке pyspark 2.2, так как у нее есть API для получения offsetRanges и сохранения смещений в redis. Мне пришлось вернуться к Python 2.7, так как в Python 3.6 нет «длинной» поддержки.
import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD
def get_offset_ranges(topic):
ranges = None
rk = '{topic}:offsets'.format(topic=topic)
cache = redis.Redis()
if cache.exists(rk):
mapping = cache.hgetall(rk)
ranges = dict()
for k, v in mapping.items():
tp = TopicAndPartition(topic, int(k))
ranges[tp] = long(v)
return ranges
def update_offset_ranges(offset_ranges):
cache = redis.Redis()
for rng in offset_ranges:
rk = '{rng.topic}:offsets'.format(rng=rng)
print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
cache.hset(rk, rng.partition, rng.untilOffset)
def do_some_work(rdd):
pass
def process_dstream(rdd):
rdd.foreachPartition(lambda iter: do_some_work(iter))
krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
off_ranges = krdd.offsetRanges()
for o in off_ranges:
print(str(o))
update_offset_ranges(off_ranges)
sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)
kafka_params = {
"bootstrap.servers": "localhost:9092",
"group.id": "myUserGroup",
"enable.auto.commit": "false",
"auto.offset.reset": "smallest"
}
topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()
# Wait for the job to finish
try:
ssc.awaitTermination()
except Exception as e:
ssc.stop()
raise e # to exit with error condition