java .io.IOException: py4j.Py4JException: ошибка при получении нового канала связи в потоковой передаче pyspark - PullRequest
0 голосов
/ 07 мая 2020

Потоковая передача Pyspark произошла ошибка при запуске около двух часов, мой код находится ниже:

def init_create_context():
    sc = SparkContext(appName="test")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, 60)

    kafka_params = {"metadata.broker.list": "localhost:9092"}
    kafkaStream = KafkaUtils.createDirectStream(ssc, ["test"], kafka_params,
                                            valueDecoder=lambda x: json.loads(x.decode("utf-8")))

    kafkaStream.checkpoint(60)

    def f(x):
        time.sleep(random.randint(1, 4))
        return x[1].get("userId"), len(x[1].get("lifeIds"))

    def sink(rdds):
        def mp(p):
            hbase_util = HappyBaseUtil("localhost", "2181")  # my own hbase api
            for row in p:
                if len(row) != 0:
                    hbase_util.put("stream_inset_test", row[0], {"info:lifeIdNum": row[1]})
        rdds.foreachPartition(mp)

    kafkaStream.map(f).foreachRDD(sink)

    return ssc


if __name__ == "__main__":
    ssc = StreamingContext.getOrCreate("hdfs://127.0.0.1:9000/cpts", init_create_context)
    ssc.start()
    ssc.awaitTermination()

Я отправляю код spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.3 --master yarn --executor-cores 6 --driver-memory 2g --executor-memory 2g test_streaming.py, а все остальные по умолчанию, но после запуска около двух часов появляется сообщение об ошибке, как показано ниже:

Traceback (most recent call last):
  File "/home/test/test_streaming.py", line 54, in <module>
    ssc.awaitTermination()
  File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/pyspark.zip/pyspark/streaming/context.py", line 192, in awaitTermination
  File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/home/softs/spark-2.4.3-bin-hadoop2.6/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.awaitTermination.
: java.io.IOException: py4j.Py4JException: Error while obtaining a new communication channel
... ...
Caused by: py4j.Py4JException: Error while obtaining a new communication channel
        at py4j.CallbackClient.getConnectionLock(CallbackClient.java:257)
at py4j.CallbackClient.sendCommand(CallbackClient.java:377)
        at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
        at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
        at com.sun.proxy.$Proxy4.dumps(Unknown Source)
... ...
Caused by: java.net.SocketException: Connection reset

И я искал net, но никакое решение не может с этим справиться, как я могу решить эту проблему?

Spark версии 2.4.3, py4j версии 0.10.7 и python версии 3.5.3

...