У меня есть скрипт pyspark, как показано ниже.В этом сценарии я собираю stdout и stderr сценария в файле и сохраняю в локальном.
В этом сценарии я запускаю начальную функцию в цикле над входным файлом.
Скрипт работает нормально, но с небольшой ошибкой.В этом сценарии я использую подпроцесс для перемещения данных локально в другое место.
Я хочу, чтобы этот вызов подпроцесса выполнялся после завершения цикла, но вместо этого он запускается, когда цикл запускается в первый раз, и когда во второй раз цикл запускается, он выдает ожидаемую ошибку.Когда файл присутствует, он выдает ошибку.
Сценарий Pyspark
#!/usr/bin/python
import os
import sys
import traceback
import subprocess
import pandas as pd
from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
def initial(
table,
hivedb,
domain,
port,
mysqldb,
mysql_user,
user,
dir,
):
day = datetime.now().strftime('%Y-%m-%d')
month = datetime.now().strftime('%Y-%m')
Linux_path = '/data/logging/{}'.format(input_file)
HDFS_path = '/user/{}/{}/logging/{}/{}/{}'.format(user,dir,mysqldb,month,day)
subprocess.call(["hdfs", "dfs", "-mkdir", "-p", HDFS_path])
subprocess.call(["rm", Linux_path])
so = se = open('/data/logging/{}'.format(input_file), 'a',
0)
#re-open stdout without buffering
sys.stdout = os.fdopen(sys.stdout.fileno(), 'a', 0)
# redirect stdout and stderr to the log file opened above
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
### CODE:
Do something
### if errors the print traceback
### repeat the same for every table in input file
subprocess.call(["hdfs", "dfs", "-put", Linux_path, HDFS_path])
if len(sys.argv) != 9:
print 'Invalid number of args......'
print 'Usage: spark-submit file.py Input_path Output_path'
exit()
input_file = sys.argv[1]
hivedb = sys.argv[2]
domain = sys.argv[3]
port = sys.argv[4]
mysqldb = sys.argv[5]
mysql_user = sys.argv[6]
user = sys.argv[7]
dir = sys.argv[8]
input = \
sc.textFile('/user/{}/{}/{}/args/{}'.format(user,
dir, mysqldb,
input_file)).collect()
for table in input:
initial(
table,
hivedb,
domain,
port,
mysqldb,
mysql_user,
user,
dir,
)
sc.stop()
print '**********************************************************************************************************************************************************************'
Как я могу изменить свой сценарий, чтобы вызов подпроцесса выполнялся после завершения цикла for?Какие изменения мне нужно внести в мой скрипт?