Драйвер Pyspark Impala jdbc не поддерживает эту дополнительную функцию - PullRequest
0 голосов
/ 09 ноября 2018

Я использую pyspark для потоковой передачи искр.Я могу правильно и без проблем создавать потоковые данные и создавать их.Мне также удалось вставить данные в таблицу Impala, созданную только из нескольких (5) выборочных столбцов из общих столбцов (72) в сообщении от Kafka.Но когда я создаю новую таблицу с надлежащими типами данных и столбцами, аналогично, в кадре данных теперь есть все столбцы, упомянутые в сообщении потока Kafka.Я получаю следующее исключение:

java.sql.SQLFeatureNotSupportedException: [Cloudera] Драйвер JDBC не поддерживает эту дополнительную функцию.на com.cloudera.impala.exceptions.ExceptionConverter.toSQLException (неизвестный источник) на com.cloudera.impala.jdbc.common.SPreparedStatement.checkTypeSupported (неизвестный источник) на com.cloudera.impala.jdbc.common.SPreparedStateИсточник) на org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils $ .savePartition (JdbcUtils.scala: 627) на org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils $$ anonfun $.apply (JdbcUtils.scala: 782) в org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils $$ anonfun $ saveTable $ 1.apply (JdbcUtils.scala: 782) в org.apache.spark.rdd.RDD$$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply (RDD.scala: 926) в org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1 $$ anonfun $ apply $ 29.apply (RDD.scala:926) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 2064) в org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala: 2064) в org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 87) в org.apache.spark.scheduler.Task.run (Task.scala: 108) в org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 338) в java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java: 1149) на java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624) на java.lang.Thread.run (Thread.java:748)

Я искалмного об этом, но не смог найти никакого решения по этому вопросу.Я также включил журналы отладки, но там не будет упоминаться, какую функцию драйвер не поддерживает.Любая помощь или правильное руководство будет оценено.Спасибо

Сведения о версии:

pyspark: 2.2.0 Kafka: 0.10.2 Cloudera: 5.15.0 Cloudera Impala: 2.12.0-cdh5.15.0 Cloudera Impala JDBC драйвер: 2.6.4

Код, который я использовал:

import json
from pyspark import SparkContext,SparkConf,HiveContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession,Row
from pyspark.sql.functions import lit
from pyspark.sql.types import *

conf = SparkConf().setAppName("testkafkarecvstream")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
spark = SparkSession.builder.appName("testkafkarecvstream").getOrCreate()
jdbcUrl = "jdbc:impala://hostname:21050/dbName;AuthMech=0;"

fields = [
                 StructField("column_name01", StringType(), True),
                 StructField("column_name02", StringType(), True),
                 StructField("column_name03", DoubleType(), True),
                 StructField("column_name04", StringType(), True),
                 StructField("column_name05", IntegerType(), True),
                 StructField("column_name06", StringType(), True),
                  .....................
                 StructField("column_name72", StringType(), True),
]

schema = StructType(fields)

def make_rows(parts):
    customRow = Row(column_name01=datatype(parts['column_name01']),
                              .....,
                              column_name72=datatype(parts['column_name72'])
                           )
    return customRow


def createDFToParquet(rdd):
    try:
        df = spark.createDataFrame(rdd,schema)
        df.show()df.write.jdbc(jdbcUrl,
                            table="table_name",
                            mode="append",)
    except Exception as e:
        print str(e)


zkNode = "zkNode_name:2181"
topic = "topic_name"

# Reciever method
kvs = KafkaUtils.createStream(ssc,
                              zkNode,
                              "consumer-group-id",
                              {topic:5},
                              {"auto.offset.reset" : "smallest"})

lines = kvs.map(lambda x: x[1])
conv = lines.map(lambda x: json.loads(x))
table = conv.map(makeRows)
table.foreachRDD(createDFToParquet)

table.pprint()

ssc.start()
ssc.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...