Я использую ThreadPoolExecutor из concurrent.futures
в Python, чтобы применить параллелизм к моей работе в Pyspark. Но я вижу, что обычно, когда у меня есть задачи с тяжелыми соединениями для чтения или широковещания в Pyspark, ThreadPool просто зависает. Интерфейс Spark, кажется, зависает бесконечно!
Может ли кто-нибудь помочь мне решить эту проблему?
Также, если кто-то может показать мне, как использовать ProcessPool вместо Threadpool в Pyspark, было бы полезно. Когда я пытаюсь это сделать, я получаю:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Program Files\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="__mp_main__")
File "C:\Program Files\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Program Files\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Program Files\Python37\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "C:\Studies\Python\multiprocessing1.py", line 13, in <module>
sc = pyspark.SparkContext(conf=conf)
File "C:\spark-2.4.5-bin-hadoop2.7\python\pyspark\context.py", line 136, in __init__
conf, jsc, profiler_cls)
File "C:\spark-2.4.5-bin-hadoop2.7\python\pyspark\context.py", line 198, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "C:\spark-2.4.5-bin-hadoop2.7\python\pyspark\context.py", line 306, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "C:\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1525, in __call__
File "C:\spark-2.4.5-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
java.lang.reflect.Constructor.newInstance(Unknown Source)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Unknown Source)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2483)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$2.apply(SparkContext.scala:2479)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2479)
at org.apache.spark.SparkContext$.markPartiallyConstructed(SparkContext.scala:2568)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:85)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Traceback (most recent call last):
File "C:/Studies/Python/multiprocessing1.py", line 71, in <module>
main()
File "C:/Studies/Python/multiprocessing1.py", line 57, in main
print('Result: ', _.result())
File "C:\Program Files\Python37\lib\concurrent\futures\_base.py", line 428, in result
return self.__get_result()
File "C:\Program Files\Python37\lib\concurrent\futures\_base.py", line 384, in __get_result
raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Код для вышеуказанной ошибки:
from pyspark.sql import *
from pyspark import SparkContext, SparkConf
import threading
import pyspark
from pyspark.sql.functions import *
import findspark
import time
import concurrent.futures
findspark.init()
conf = pyspark.SparkConf().setAppName('appName').setMaster(
'local').set('spark.scheduler.mode', 'FAIR')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)
thread_local = threading.local()
arr = ["A", "B", "C"]
def test(a):
z = ("NA", "NA")
if a == "A":
sc.setLocalProperty("spark.scheduler.pool", a)
print("thread A started at " + str(time.time()))
range100 = spark.range(100)
count100 = range100.count()
z = (a, str(count100))
elif a == "B":
sc.setLocalProperty("spark.scheduler.pool", a)
print("thread B started at " + str(time.time()))
range200 = spark.range(100000)
count200 = range200.count()
z = (a, str(count200))
elif a == "C":
sc.setLocalProperty("spark.scheduler.pool", a)
print("thread C started at " + str(time.time()))
range300 = spark.range(300)
count300 = range300.count()
z = (a, str(count300))
print("thread {} ended at ".format(a) + str(time.time()))
sc.setLocalProperty("spark.scheduler.pool", None)
return z
start_time = time.time()
result = []
# for i in arr:
# result.append(test(i))
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
# result = executor.map(test, arr)
result_futures = list(map(lambda x: executor.submit(test, x), arr))
# result = [Row(f.result())
# for f in concurrent.futures.as_completed(result_futures)]
for _ in concurrent.futures.as_completed(result_futures):
print('Result: ', _.result())
result.append(_.result())
# print(result)
result_df = spark.sparkContext.parallelize(result).toDF()
print(result_df.show())
print(start_time-time.time())
# result = p.map(test, arr)
# p.close()
# p.join()