Я пытаюсь настроить поступление данных Кафки в реальном времени в HBase через PySpark в соответствии с этого руководства . У меня проблема с кодом, показанным ниже. Когда я запускаю его, я получаю вывод примерно так:
APPNAME:Kafka_MapR-Streams_to_HBase
APPID:local-1553526448
VERSION:2.4.0
=====Pull from Stream=====
saveAsNewAPIHadoopDataset
Я передаю данные в этом формате:
3926426402421,OCT 23 10:23:39 {nat}[FWNAT]: STH 129.15.90.22:1404 [34.62.15.31:086] -> 170.14.183.168:63 UDP
Я не получаю никаких ошибок и т. Д., Однако данные не добавляются в таблицу Hbase (create "logs","log"
), а также эта строка кода не выполняется print('saveAsNewAPIHadoopDataset2')
. Я установил batchDuration
eqauls на 1
секунду, и когда я, например, печатаю print(len(rdd.collect()))
, все, кажется, работает. Я получаю такие значения, как 400000 и т. Д. Есть идеи, что не так?
def SaveToHBase(rdd):
print("=====Pull from Stream=====")
if not rdd.isEmpty():
host = 'myhost'
table = 'logs'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,
"hbase.mapred.outputtable": table,
"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
"mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
print('saveAsNewAPIHadoopDataset1')
rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
print('saveAsNewAPIHadoopDataset2')
parsed = kds.filter(lambda x: x != None and len(x) > 0 )
parsed = parsed.map(lambda x: x[1])
parsed = parsed.map(lambda rec: rec.split(","))
parsed = parsed.filter(lambda x: x != None and len(x) == 2 )
parsed = parsed.map(lambda data:Row(log_id=getValue(str,data[0]), \
log=getValue(str,data[1])))
parsed = kds.filter(lambda x: x != None and len(x) > 0 )
parsed.foreachRDD(SaveToHBase)