Я новичок в Hbase. У меня есть датафрейм, который я хочу сохранить в HBase в Ambari. Сообщение об ошибке: ERROR Utils: прерывание задачи java .lang.UnsupportedOperationException: PrimitiveType coder: неподдерживаемый тип данных null
Я попытался устранить проблему, и теперь в кадре данных больше нет нулевого значения Я все та же ошибка. Кто-нибудь может мне помочь?
+--------------------+-------+---------+------+------+
| time|col1 | col2| col3| col4|
+--------------------+-------+---------+-------------+
|2020-04-12T01:30:...|+30003 | 532879| +1830| 20577|
|2020-04-11T18:15:...|+18838 | 521714| +1317| 20064|
+--------------------+--------+--------+------+------+
Ниже приведен мой код для ссылки:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
def main():
def g(x):
for i in x.collect():
schema = StructType({CODE FOR STRUCTTYPE})
df1 = spark.createDataFrame(i,schema = schema)
df2=df1.select(col("time"),col("col1"),col("col2"),col("col3"),col("col4"))
df3=df2.fillna({'col3':'0'})
data_source_format = "org.apache.spark.sql.execution.datasources.hbase"
catalog =''.join("""{"table":\
{"namespace":"default","name":"data"},"rowkey":"key","columns":{"time":\
{"cf":"rowkey","col":"key","type":"string"},{three other cols}}""".split())
df3.write
.options(catalog=catalog,newtable=5)
.format(data_source_format)
.save()
spark.read.options(catalog=catalog).format(data_source_format).load()
conf = SparkConf().setAppName("PySparkKafka").setMaster("local[2]")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
ssc = StreamingContext(sc, 10)
topic =['api-spark1']
kafkaStream = KafkaUtils.createDirectStream(ssc,topic,{"metadata.broker.list": "sandbox-hdp.hortonworks.com:6667"})
parsed = kafkaStream.map(lambda kv: json.loads(kv[1])['response'])
parsed.foreachRDD(g)
ssc.start()
ssc.awaitTermination()
if __name__=='__main__':
main()
Это сообщение об ошибке:
20/04/12 01:59:57 ERROR Utils: Aborting task
java.lang.UnsupportedOperationException: PrimitiveType coder: unsupported data type null
at org.apache.spark.sql.execution.datasources.hbase.types.PrimitiveType.toBytes(PrimitiveType.scala:61)