По сути, это поле, которое помогает нам управлять контрольно-пропускными пунктами.Управление смещениями наиболее полезно для обеспечения непрерывности данных в течение жизненного цикла потокового процесса.Например, после закрытия потокового приложения или неожиданного сбоя диапазоны смещения будут потеряны, если они не будут сохранены в энергонезависимом хранилище данных.Кроме того, без смещения считываемых разделов задание Spark Streaming не сможет продолжить обработку данных с того места, где оно было остановлено в последний раз.Так что мы можем обрабатывать смещение несколькими способами.Одним из способов является сохранение значения смещения в Zookeeper и чтение того же самого при создании DSstream.
from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{var_topic_src_name}"
print(path)
zk.ensure_path(path)
zk.set(path, str(offset.untilOffset).encode())
var_offset_path = f'/consumers/{var_topic_src_name}'
try:
var_offset = int(zk.get(var_offset_path)[0])
except:
print("The spark streaming started First Time and Offset value should be Zero")
var_offset = 0
var_partition = 0
enter code here
topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
fromoffset = {topicpartion: var_offset}
print(fromoffset)
kvs = KafkaUtils.createDirectStream(ssc,\
[var_topic_src_name],\
var_kafka_parms_src,\
valueDecoder=serializer.decode_message,\
fromOffsets = fromoffset)
kvs.foreachRDD(handler)
kvs.foreachRDD(save_offsets)
Ссылка:
Обновление PySpark Kafka Direct Streaming Zookeeper / Kafka Offset
С уважением
Картикеян Расипалаям Дурайрадж