Вручную зафиксировать смещение в кафке Direct Stream в python - PullRequest
0 голосов
/ 07 января 2019

Я портирую потоковое приложение, написанное на scala, на python. Я хочу вручную зафиксировать смещение для DStream. Это делается в Scala, как показано ниже:

stream = KafkaUtils.createDirectStream(soomeConfigs)
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

Но я не могу найти похожие API в Python. Не могли бы вы рассказать мне о том же, что и как я могу фиксировать смещения вручную с помощью клиента Python.

1 Ответ

0 голосов
/ 07 января 2019

Я решил эту проблему, вернувшись к библиотеке pyspark 2.2, так как у нее есть API для получения offsetRanges и сохранения смещений в redis. Мне пришлось вернуться к Python 2.7, так как в Python 3.6 нет «длинной» поддержки.

import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD


def get_offset_ranges(topic):
    ranges = None

    rk = '{topic}:offsets'.format(topic=topic)
    cache = redis.Redis()
    if cache.exists(rk):
        mapping = cache.hgetall(rk)
        ranges = dict()
        for k, v in mapping.items():
            tp = TopicAndPartition(topic, int(k))
            ranges[tp] = long(v)

    return ranges


def update_offset_ranges(offset_ranges):
    cache = redis.Redis()
    for rng in offset_ranges:
        rk = '{rng.topic}:offsets'.format(rng=rng)
        print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
        cache.hset(rk, rng.partition, rng.untilOffset)


def do_some_work(rdd):
    pass


def process_dstream(rdd):
    rdd.foreachPartition(lambda iter: do_some_work(iter))

    krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
    off_ranges = krdd.offsetRanges()
    for o in off_ranges:
        print(str(o))
    update_offset_ranges(off_ranges)


sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)

kafka_params = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "myUserGroup",
    "enable.auto.commit": "false",
    "auto.offset.reset": "smallest"
}

topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()

# Wait for the job to finish
try:
    ssc.awaitTermination()
except Exception as e:
    ssc.stop()
    raise e  # to exit with error condition
...