Почему saveAsNewAPIHadoopDataset не работает и не возвращает никаких ошибок при использовании PySpark, Spark Streaming и Hbase? - PullRequest
0 голосов
/ 16 апреля 2019

Я пытаюсь настроить поступление данных Кафки в реальном времени в HBase через PySpark в соответствии с этого руководства . У меня проблема с кодом, показанным ниже. Когда я запускаю его, я получаю вывод примерно так:

APPNAME:Kafka_MapR-Streams_to_HBase
APPID:local-1553526448
VERSION:2.4.0
=====Pull from Stream=====
saveAsNewAPIHadoopDataset

Я передаю данные в этом формате:

3926426402421,OCT 23 10:23:39 {nat}[FWNAT]: STH 129.15.90.22:1404 [34.62.15.31:086] -> 170.14.183.168:63 UDP

Я не получаю никаких ошибок и т. Д., Однако данные не добавляются в таблицу Hbase (create "logs","log"), а также эта строка кода не выполняется print('saveAsNewAPIHadoopDataset2'). Я установил batchDuration eqauls на 1 секунду, и когда я, например, печатаю print(len(rdd.collect())), все, кажется, работает. Я получаю такие значения, как 400000 и т. Д. Есть идеи, что не так?

def SaveToHBase(rdd):

    print("=====Pull from Stream=====")

    if not rdd.isEmpty():

        host = 'myhost'  
        table = 'logs'  
        keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"  
        valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"  
        conf = {"hbase.zookeeper.quorum": host,
            "hbase.mapred.outputtable": table,
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",  
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",  
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 

        print('saveAsNewAPIHadoopDataset1')
        rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv) 
        print('saveAsNewAPIHadoopDataset2')


parsed = kds.filter(lambda x: x != None and len(x) > 0 )
parsed = parsed.map(lambda x: x[1])
parsed = parsed.map(lambda rec: rec.split(","))
parsed = parsed.filter(lambda x: x != None and len(x) == 2 )
parsed = parsed.map(lambda data:Row(log_id=getValue(str,data[0]), \
        log=getValue(str,data[1])))
parsed = kds.filter(lambda x: x != None and len(x) > 0 )



parsed.foreachRDD(SaveToHBase)
...