Как вызвать подпроцесс после цикла for - PullRequest
0 голосов
/ 05 июля 2019

У меня есть скрипт pyspark, как показано ниже.В этом сценарии я собираю stdout и stderr сценария в файле и сохраняю в локальном.

В этом сценарии я запускаю начальную функцию в цикле над входным файлом.

Скрипт работает нормально, но с небольшой ошибкой.В этом сценарии я использую подпроцесс для перемещения данных локально в другое место.

Я хочу, чтобы этот вызов подпроцесса выполнялся после завершения цикла, но вместо этого он запускается, когда цикл запускается в первый раз, и когда во второй раз цикл запускается, он выдает ожидаемую ошибку.Когда файл присутствует, он выдает ошибку.

Сценарий Pyspark

#!/usr/bin/python

import os
import sys
import traceback
import subprocess
import pandas as pd
from datetime import datetime
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)


def initial(
    table,
    hivedb,
    domain,
    port,
    mysqldb,
    mysql_user,
    user,
    dir,
    ):


    day = datetime.now().strftime('%Y-%m-%d')
    month = datetime.now().strftime('%Y-%m')

    Linux_path = '/data/logging/{}'.format(input_file)
    HDFS_path = '/user/{}/{}/logging/{}/{}/{}'.format(user,dir,mysqldb,month,day)

    subprocess.call(["hdfs", "dfs", "-mkdir", "-p", HDFS_path])
    subprocess.call(["rm", Linux_path])

    so = se = open('/data/logging/{}'.format(input_file), 'a',
                   0)

    #re-open stdout without buffering
    sys.stdout = os.fdopen(sys.stdout.fileno(), 'a', 0)

    # redirect stdout and stderr to the log file opened above
    os.dup2(so.fileno(), sys.stdout.fileno())
    os.dup2(se.fileno(), sys.stderr.fileno())

       ### CODE:

    Do something

    ### if errors the print traceback

    ### repeat the same for every table in input file


    subprocess.call(["hdfs", "dfs", "-put", Linux_path, HDFS_path])


if len(sys.argv) != 9:
    print 'Invalid number of args......'
    print 'Usage: spark-submit file.py Input_path Output_path'
    exit()

input_file = sys.argv[1]
hivedb = sys.argv[2]
domain = sys.argv[3]
port = sys.argv[4]
mysqldb = sys.argv[5]
mysql_user = sys.argv[6]
user = sys.argv[7]
dir = sys.argv[8]

input = \
    sc.textFile('/user/{}/{}/{}/args/{}'.format(user,
                dir, mysqldb,
                input_file)).collect()

for table in input:
    initial(
        table,
        hivedb,
        domain,
        port,
        mysqldb,
        mysql_user,
        user,
        dir,
        )

sc.stop()
print '**********************************************************************************************************************************************************************'

Как я могу изменить свой сценарий, чтобы вызов подпроцесса выполнялся после завершения цикла for?Какие изменения мне нужно внести в мой скрипт?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...