Загрузка данных Kafka в HBase через PySpark - произошла ошибка при вызове None.org.apache.spark.streaming.api.java.JavaStreamingContext - PullRequest
0 голосов
/ 25 февраля 2019

Я пытаюсь настроить поступление данных Кафки в реальном времени в HBase через PySpark в соответствии с этого руководства .У меня есть проблема с Spark Streaming, то есть, я получаю такую ​​ошибку:

Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext.
: java.lang.NullPointerException
    at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:238)
    at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
    at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

spark --version возвращает version 2.4.0 Using Scala version 2.11.12.Если требуется дополнительная информация, просто дайте мне знать.Мой исходный код:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /my/path/spark/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar pyspark-shell'

import findspark
findspark.init()
import pyspark
import random
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import  *;
# from pyspark_ext import *
import happybase

appName = "Kafka_MapR-Streams_to_HBase"
config = SparkConf().setAppName(appName)  

props = []
props.append(("spark.rememberDuration", "10"))
props.append(("spark.batchDuration", "10"))
props.append(("spark.eventLog.enabled", "true"))
props.append(("spark.streaming.timeout", "30"))
props.append(("spark.ui.enabled", "true"))

config = config.setAll(props)

sc.stop()
sc = SparkContext(conf=config)
sc.stop()
ssc = StreamingContext(sc, int(config.get("spark.batchDuration")))

def runApplication(ssc, config):
  ssc.start()
  if config.get("spark.streaming.timeout") == '':
    ssc.awaitTermination()
  else:
    stopped = ssc.awaitTerminationOrTimeout(int(config.get("spark.streaming.timeout")))
  if not stopped :
    print("Stopping streaming context after timeout...")
    ssc.stop(True)
    print("Streaming context stopped.")

hbase_table = 'clicks'
hconn = happybase.Connection('hostname')  
ctable = hconn.table(hbase_table)

ОБНОВЛЕНИЕ

Я думаю, что проблема была связана с sc.stop().Удаление этого и изменение sc = SparkContext(conf=config) на SparkContext.getOrCreate(conf=config), вероятно, решило проблему.

...