Я хочу иметь возможность заставить spark выполнять мой код в нужном мне порядке.
В приведенном ниже примере функции foo
и bar
выполняют манипулирование данными, но send_request
- это просто веб-триггер, на который эти функции не влияют. Когда Spark выполняет приведенный ниже код, он запускает send_request
первый и foo
и bar
позже.
Это не работает для меня, потому что после завершения foo
и bar
у меня истекает время ожидания от моего запроса. Если запрос выполняется после foo
, результат будет готов одновременно с завершением bar
. Как я мог достичь этого в искре.
Я мог бы иметь отдельные сценарии для каждого шага, но время запуска кластера увеличивается, поэтому я хотел бы иметь возможность изменять порядок выполнения. Я использую блоки данных на Azure, если это помогает.
import os
import base64
import requests
import pyspark
sc.addFile("dbfs:/bar.py")
from bar import bar
sc.addFile("dbfs:/foo.py")
from foo import foo
if __name__ == '__main__':
foo()
response = send_request(request=request_json)
bar()
содержимое foo
и bar
и send_request
выглядит следующим образом
def foo():
df = spark.read.parquet(file_1_path)
df = df.filter(F.col('IDType') == 'E') \
.select(F.col('col1'),F.col('col2')).distinct()
df.repartition(10).write.parquet(file_1_new_path)
logger.info('1 foo is done')
и
def bar():
df = spark.read.parquet(file_2_path)
df = df.filter(F.col('IDType') == 'M') \
.select(F.col('col1'),F.col('col2')).distinct()
df.repartition(10).write.parquet(file_2_new_path)
logger.info('3 bar is done')
и
def send_request():
response_json = http_response.json()
logger.info('2 request is sent')
Я постараюсь быть более ясным. Когда я запускаю код выше в искре, я получаю следующий вывод
2 request is sent
1 foo is done
3 bar is done
Но я хочу, чтобы он был в следующем порядке
1 foo is done
2 request is sent
3 bar is done