Я пытаюсь настроить поступление данных Кафки в реальном времени в HBase через PySpark в соответствии с этого руководства .У меня есть проблема с Spark Streaming, то есть, я получаю такую ошибку:
Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext.
: java.lang.NullPointerException
at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
spark --version
возвращает version 2.4.0 Using Scala version 2.11.12
.Если требуется дополнительная информация, просто дайте мне знать.Мой исходный код:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /my/path/spark/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar pyspark-shell'
import findspark
findspark.init()
import pyspark
import random
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import *;
# from pyspark_ext import *
import happybase
appName = "Kafka_MapR-Streams_to_HBase"
config = SparkConf().setAppName(appName)
props = []
props.append(("spark.rememberDuration", "10"))
props.append(("spark.batchDuration", "10"))
props.append(("spark.eventLog.enabled", "true"))
props.append(("spark.streaming.timeout", "30"))
props.append(("spark.ui.enabled", "true"))
config = config.setAll(props)
sc.stop()
sc = SparkContext(conf=config)
sc.stop()
ssc = StreamingContext(sc, int(config.get("spark.batchDuration")))
def runApplication(ssc, config):
ssc.start()
if config.get("spark.streaming.timeout") == '':
ssc.awaitTermination()
else:
stopped = ssc.awaitTerminationOrTimeout(int(config.get("spark.streaming.timeout")))
if not stopped :
print("Stopping streaming context after timeout...")
ssc.stop(True)
print("Streaming context stopped.")
hbase_table = 'clicks'
hconn = happybase.Connection('hostname')
ctable = hconn.table(hbase_table)
ОБНОВЛЕНИЕ
Я думаю, что проблема была связана с sc.stop()
.Удаление этого и изменение sc = SparkContext(conf=config)
на SparkContext.getOrCreate(conf=config)
, вероятно, решило проблему.