pyspark kafka потоковое смещение - PullRequest
       33

pyspark kafka потоковое смещение

0 голосов
/ 05 октября 2018

Ниже приведена ссылка, относящаяся к потоковой передаче смещения темы kafka в pyspark.

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import TopicAndPartition

stream = StreamingContext(sc, 120) # 120 second window

kafkaParams = {"metadata.broker.list":"1:667,2:6667,3:6667"}
kafkaParams["auto.offset.reset"] = "smallest"
kafkaParams["enable.auto.commit"] = "false"

topic = "xyz"
topicPartion = TopicAndPartition(topic, 0)
fromOffset = {topicPartion: long(PUT NUMERIC OFFSET HERE)}
kafka_stream = KafkaUtils.createDirectStream(stream, [topic], kafkaParams, 

fromOffsets = fromOffset)

Ссылочная ссылка: Управление смещением kafka потоковой передачи Spark

Я не понимаю, что предоставить ниже в случае, если мне нужно прочитать последние 15 минут данных из kafka для каждого окна / пакета.

fromOffset = {topicPartion: long (PUT NUMERICСМЕЩЕНИЕ ЗДЕСЬ)}

1 Ответ

0 голосов
/ 16 января 2019

По сути, это поле, которое помогает нам управлять контрольно-пропускными пунктами.Управление смещениями наиболее полезно для обеспечения непрерывности данных в течение жизненного цикла потокового процесса.Например, после закрытия потокового приложения или неожиданного сбоя диапазоны смещения будут потеряны, если они не будут сохранены в энергонезависимом хранилище данных.Кроме того, без смещения считываемых разделов задание Spark Streaming не сможет продолжить обработку данных с того места, где оно было остановлено в последний раз.Так что мы можем обрабатывать смещение несколькими способами.Одним из способов является сохранение значения смещения в Zookeeper и чтение того же самого при создании DSstream.

from kazoo.client import KazooClient
zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()
ZOOKEEPER_SERVERS = "127.0.0.1:2181"

def get_zookeeper_instance():
    from kazoo.client import KazooClient
    if 'KazooSingletonInstance' not in globals():
        globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
        globals()['KazooSingletonInstance'].start()
    return globals()['KazooSingletonInstance']

def save_offsets(rdd):
    zk = get_zookeeper_instance()
    for offset in rdd.offsetRanges():
        path = f"/consumers/{var_topic_src_name}"
        print(path)
        zk.ensure_path(path)
        zk.set(path, str(offset.untilOffset).encode())

    var_offset_path = f'/consumers/{var_topic_src_name}'

    try:
        var_offset = int(zk.get(var_offset_path)[0])
    except:
        print("The spark streaming started First Time and Offset value should be Zero")
        var_offset  = 0
    var_partition = 0
    enter code here
    topicpartion = TopicAndPartition(var_topic_src_name, var_partition)
    fromoffset = {topicpartion: var_offset}
    print(fromoffset)
    kvs = KafkaUtils.createDirectStream(ssc,\
                                        [var_topic_src_name],\
                                        var_kafka_parms_src,\
                                        valueDecoder=serializer.decode_message,\
                                        fromOffsets = fromoffset)
    kvs.foreachRDD(handler)
    kvs.foreachRDD(save_offsets)

Ссылка:

Обновление PySpark Kafka Direct Streaming Zookeeper / Kafka Offset

С уважением

Картикеян Расипалаям Дурайрадж

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...