Потоковая передача Pyspark произошла ошибка при запуске около двух часов, мой код находится ниже:
def init_create_context():
sc = SparkContext(appName="test")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 60)
kafka_params = {"metadata.broker.list": "localhost:9092"}
kafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafka_params,
valueDecoder=lambda x: json.loads(x.decode("utf-8")))
kafkaStream.checkpoint(60)
def f(x):
time.sleep(random.randint(1, 4))
return x[1].get("userId"), len(x[1].get("lifeIds"))
def sink(rdds):
def mp(p):
hbase_util = HappyBaseUtil("localhost", "2181") # my own hbase api
for row in p:
if len(row) != 0:
hbase_util.put("stream_inset_test", row[0], {"info:lifeIdNum": row[1]})
rdds.foreachPartition(mp)
kafkaStream.map(f).foreachRDD(sink)
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate("hdfs://127.0.0.1:9000/cpts", init_create_context)
ssc.start()
ssc.awaitTermination()
Я отправляю код spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3 --master yarn --executor-cores 6 --driver-memory 2g --executor-memory 2g test_streaming.py
, а все остальные по умолчанию, но после запуска около двух часов появляется сообщение об ошибке, как показано ниже:
Traceback (most recent call last):
File "/home/test/test_streaming.py", line 54, in <module>
ssc.awaitTermination()
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.awaitTermination.
: java.io.IOException: py4j.Py4JException: Error while obtaining a new communication channel
... ...
Caused by: py4j.Py4JException: Error while obtaining a new communication channel
at py4j.CallbackClient.getConnectionLock(CallbackClient.java:257)
at py4j.CallbackClient.sendCommand(CallbackClient.java:377)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy4.dumps(Unknown Source)
... ...
Caused by: java.net.SocketException: Connection reset
И я искал net, но никакое решение не может с этим справиться, как я могу решить эту проблему?
Spark версии 2.4.3, py4j версии 0.10.7 и python версии 3.5.3