ОШИБКИ Utils: прерывание задачи java .lang.UnsupportedOperationException: кодировщик PrimitiveType: неподдерживаемый тип данных null - PullRequest
1 голос
/ 12 апреля 2020

Я новичок в Hbase. У меня есть датафрейм, который я хочу сохранить в HBase в Ambari. Сообщение об ошибке: ERROR Utils: прерывание задачи java .lang.UnsupportedOperationException: PrimitiveType coder: неподдерживаемый тип данных null

Я попытался устранить проблему, и теперь в кадре данных больше нет нулевого значения Я все та же ошибка. Кто-нибудь может мне помочь?

+--------------------+-------+---------+------+------+
|                time|col1    |    col2|  col3|  col4|
+--------------------+-------+---------+-------------+
|2020-04-12T01:30:...|+30003  |  532879| +1830| 20577|
|2020-04-11T18:15:...|+18838  |  521714| +1317| 20064|
+--------------------+--------+--------+------+------+

Ниже приведен мой код для ссылки:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json

def main():
    def g(x):
        for i in x.collect():
            schema = StructType({CODE FOR STRUCTTYPE})
            df1 = spark.createDataFrame(i,schema = schema)
            df2=df1.select(col("time"),col("col1"),col("col2"),col("col3"),col("col4"))
            df3=df2.fillna({'col3':'0'})
            data_source_format = "org.apache.spark.sql.execution.datasources.hbase"
            catalog =''.join("""{"table":\
            {"namespace":"default","name":"data"},"rowkey":"key","columns":{"time":\ 
            {"cf":"rowkey","col":"key","type":"string"},{three other cols}}""".split())
            df3.write
             .options(catalog=catalog,newtable=5)
             .format(data_source_format)
             .save()
            spark.read.options(catalog=catalog).format(data_source_format).load()

    conf = SparkConf().setAppName("PySparkKafka").setMaster("local[2]")  
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    ssc = StreamingContext(sc, 10)
    topic =['api-spark1']
    kafkaStream = KafkaUtils.createDirectStream(ssc,topic,{"metadata.broker.list": "sandbox-hdp.hortonworks.com:6667"})
    parsed = kafkaStream.map(lambda kv: json.loads(kv[1])['response'])
    parsed.foreachRDD(g)
    ssc.start()
    ssc.awaitTermination()

if __name__=='__main__':
    main()

Это сообщение об ошибке:

20/04/12 01:59:57 ERROR Utils: Aborting task
java.lang.UnsupportedOperationException: PrimitiveType coder: unsupported data type null
    at org.apache.spark.sql.execution.datasources.hbase.types.PrimitiveType.toBytes(PrimitiveType.scala:61)

1 Ответ

0 голосов
/ 12 апреля 2020

unsupported data type null означает, что один из столбцов, который вы используете, имеет нулевые значения. Чтобы обойти это, просто сделайте это cast(null as int)

Spark, идентифицирующий тип данных, такой как string, int et c. но не null тип.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...