ThreadPoolExecutor вызывает зависания в пулах Pyspark - PullRequest
0 голосов
/ 08 апреля 2020

Я использую ThreadPoolExecutor из concurrent.futures в Python, чтобы применить параллелизм к моей работе в Pyspark. Но я вижу, что обычно, когда у меня есть задачи с тяжелыми соединениями для чтения или широковещания в Pyspark, ThreadPool просто зависает. Интерфейс Spark, кажется, зависает бесконечно!

Может ли кто-нибудь помочь мне решить эту проблему?

Также, если кто-то может показать мне, как использовать ProcessPool вместо Threadpool в Pyspark, было бы полезно. Когда я пытаюсь это сделать, я получаю:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
    exitcode = _main(fd)
  File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
    prepare(preparation_data)
  File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
    run_name="__mp_main__")
  File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "C:\Program Files\Python37\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\Studies\Python\multiprocessing1.py", line 13, in <module>
    sc = pyspark.SparkContext(conf=conf)
  File "C:\spark-2.4.5-bin-hadoop2.7\python\pyspark\context.py", line 136, in __init__
    conf, jsc, profiler_cls)
  File "C:\spark-2.4.5-bin-hadoop2.7\python\pyspark\context.py", line 198, in _do_init
    self._jsc = jsc or self._initialize_context(self._conf._jconf)
  File "C:\spark-2.4.5-bin-hadoop2.7\python\pyspark\context.py", line 306, in _initialize_context
    return self._jvm.JavaSparkContext(jconf)
  File "C:\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1525, in __call__
  File "C:\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Unknown Source)
        at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2483)
        at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2479)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2479)
        at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2568)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:85)
        at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
        at java.lang.reflect.Constructor.newInstance(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Unknown Source)

Traceback (most recent call last):
  File "C:/Studies/Python/multiprocessing1.py", line 71, in <module>
    main()
  File "C:/Studies/Python/multiprocessing1.py", line 57, in main
    print('Result: ', _.result())
  File "C:\Program Files\Python37\lib\concurrent\futures\_base.py", line 428, in result
    return self.__get_result()
  File "C:\Program Files\Python37\lib\concurrent\futures\_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Код для вышеуказанной ошибки:

from pyspark.sql import *
from pyspark import SparkContext, SparkConf
import threading
import pyspark
from pyspark.sql.functions import *
import findspark
import time
import concurrent.futures
findspark.init()

conf = pyspark.SparkConf().setAppName('appName').setMaster(
    'local').set('spark.scheduler.mode', 'FAIR')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
thread_local = threading.local()

arr = ["A", "B", "C"]


def test(a):
    z = ("NA", "NA")

    if a == "A":
        sc.setLocalProperty("spark.scheduler.pool", a)
        print("thread A started at " + str(time.time()))
        range100 = spark.range(100)
        count100 = range100.count()
        z = (a, str(count100))
    elif a == "B":

        sc.setLocalProperty("spark.scheduler.pool", a)
        print("thread B started at " + str(time.time()))
        range200 = spark.range(100000)
        count200 = range200.count()
        z = (a, str(count200))
    elif a == "C":
        sc.setLocalProperty("spark.scheduler.pool", a)
        print("thread C started at " + str(time.time()))
        range300 = spark.range(300)
        count300 = range300.count()
        z = (a, str(count300))
    print("thread {} ended at ".format(a) + str(time.time()))
    sc.setLocalProperty("spark.scheduler.pool", None)
    return z


start_time = time.time()
result = []
# for i in arr:
#     result.append(test(i))
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
    # result = executor.map(test, arr)
    result_futures = list(map(lambda x: executor.submit(test, x), arr))
    # result = [Row(f.result())
    #           for f in concurrent.futures.as_completed(result_futures)]
    for _ in concurrent.futures.as_completed(result_futures):
        print('Result: ', _.result())
        result.append(_.result())

# print(result)
result_df = spark.sparkContext.parallelize(result).toDF()
print(result_df.show())
print(start_time-time.time())
# result = p.map(test, arr)
# p.close()
# p.join()
...