Я использую Spark 2.4.0 в кластере AWS. Целью является ETL, и он в значительной степени основан на Spark SQL с использованием pyspark.
У меня есть множество скриптов Python, которые вызываются по очереди. Между этими сценариями существуют зависимости данных. Существует main.py, который вызывает другие сценарии, такие как process1.py, process2.py и т. Д.
Вызов выполняется с помощью:
#invoking process 1
command1 = "/anaconda3/bin/python3 process1.py"
p1 = subprocess.Popen(command1.split(" "), stdout=PIPE, stderr=subprocess.STDOUT)
p.wait()
#invoking process 2
command2 = "/anaconda3/bin/python3 process2.py"
p2 = subprocess.Popen(command2.split(" "), stdout=PIPE, stderr=subprocess.STDOUT)
p.wait()
Каждый из этих процессов (process1.py, process2.py и т. Д.) Выполняет преобразования данных с использованием синтаксиса на основе SQL, например:
df_1. createGlobalTempView ('table_1')
result_1 = spark.sql('select * from table_1 where <some conditions>')
Проблема в том, что я хочу, чтобы кадры данных (например, df_1
или result_1
) и / или таблицы (например, table_1
) были доступны во всей последовательности обработки. Так, например, если приведенный выше код находится в process1.py, сгенерированные df_1
или table_1
должны быть доступны в process2.py.
Main.py, process1.py и process2.py получают сеанс запуска с помощью:
spark = SparkSession.builder.appName("example-spark").config("spark.sql.crossJoin.enabled", "true").getOrCreate()
Я знаю, что есть возможность использовать HIVE для хранения table_1
, но я стараюсь по возможности избежать этого сценария.
Большое спасибо за помощь!