Загрузка данных Кафки в реальном времени в HBase через PySpark;вызвать зоопарка слишком много соединений - PullRequest
0 голосов
/ 14 июня 2019

Я пытаюсь настроить поступление данных Кафки в реальном времени в HBase через PySpark.У меня проблема с использованием приведенного ниже кода, что в одном исполнителе (900+) будет слишком много zk-соединений, и это вызвало сбой моего кластера zk с большим количеством открытых файлов

    def process_rdd(t, rdd):
        if config.isDirectModel() and config.isOnUserOffsetManage():
            offsetRanges = rdd.offsetRanges()
            offset_counter = GetOffsetCounter(config)
            offset_counter.save_offset(config["_group_id"], config["Version"], offsetRanges)
            print "save kafka offset done!"
        hbase_conf_ = config.get('HBaseConf', {})
        hbase_conf = RenderConfigAttribute(hbase_conf_, "hbase.mapred.outputtable")
        rdd.map(lambda event: convertor(event, config, 'hbase_reformat')).\
            filter(lambda x: x is not None).\
            flatMap(lambda x: x).\
            saveAsNewAPIHadoopDataset(
                keyConverter=("org.apache.spark.examples.pythonconverters."
                              "StringToImmutableBytesWritableConverter"),
                valueConverter=("org.apache.spark.examples.pythonconverters."
                                "StringListToPutConverter"),
                conf=hbase_conf)
    kafka_stream.foreachRDD(process_rdd)
...