Я пытаюсь настроить поступление данных Кафки в реальном времени в HBase через PySpark.У меня проблема с использованием приведенного ниже кода, что в одном исполнителе (900+) будет слишком много zk-соединений, и это вызвало сбой моего кластера zk с большим количеством открытых файлов
def process_rdd(t, rdd):
if config.isDirectModel() and config.isOnUserOffsetManage():
offsetRanges = rdd.offsetRanges()
offset_counter = GetOffsetCounter(config)
offset_counter.save_offset(config["_group_id"], config["Version"], offsetRanges)
print "save kafka offset done!"
hbase_conf_ = config.get('HBaseConf', {})
hbase_conf = RenderConfigAttribute(hbase_conf_, "hbase.mapred.outputtable")
rdd.map(lambda event: convertor(event, config, 'hbase_reformat')).\
filter(lambda x: x is not None).\
flatMap(lambda x: x).\
saveAsNewAPIHadoopDataset(
keyConverter=("org.apache.spark.examples.pythonconverters."
"StringToImmutableBytesWritableConverter"),
valueConverter=("org.apache.spark.examples.pythonconverters."
"StringListToPutConverter"),
conf=hbase_conf)
kafka_stream.foreachRDD(process_rdd)