как выполнить задачу воздушного потока на основе ответа на командную строку - PullRequest
0 голосов
/ 23 января 2020

У меня есть эта задача в потоке воздуха:

def bcp_in(set):
    files = []
    for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
        files.append(file)
    print("LOCAL FILES {}".format(files))
    for file in files:
        print('Importing File {}'.format(file))
        cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2  -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
        print(os.popen(cmd).read())


BCP_Import_FILETYPE_Files = PythonOperator(
    task_id='BCP_Import_Files_to_DB3_Staging',
    python_callable=bcp_in,
    op_kwargs={'set': 'FILETYPE'},
    dag=dag
)

, если все работает, все в порядке. Тем не менее, мне нужно выполнить задачу, если сбой BCP. например:

[2020-01-23 02:45:08,786] {logging_mixin.py:95} INFO - Importing File /home/airflow/airflow/staging/FILETYPE_000000000000.csv
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO - 
Starting copy...
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification
SQLState = 22005, NativeError = 0
Error = [Microsoft][ODBC Driver 17 for SQL Server]Invalid character value for cast specification

BCP copy in failed
[2020-01-23 02:45:09,505] {logging_mixin.py:95} INFO - 

Этот BCP не выполнен, но задание все еще отображается зеленым. Я не знал бы, чтобы устранить неполадки, если бы я не смотрел в журнале.

Как мне сообщить о сбое задачи в этом случае. Или, если лучше .... у кого-нибудь есть лучший оператор для подачи файлов CSV / TXT в MS SQL? Спасибо.

Ответы [ 2 ]

1 голос
/ 23 января 2020

Используйте subprocess.check_ouput для запуска вашей команды, например:

import subprocess

def bcp_in(set):
    files = []
    for file in glob.glob(Variable.get("temp_directory") + "offrs/{}*.txt".format(set)):
        files.append(file)
    print("LOCAL FILES {}".format(files))
    for file in files:
        print('Importing File {}'.format(file))
        cmd = '/opt/mssql-tools/bin/bcp buyerhero_staging.dbo.FILETYPE IN "{file}" -F2  -<<HOST>> -<<PASSWORD>> -S<<SERVER>> -t"|" -c'.format(table=set, file=file)
        subprocess.check_output(cmd)

Подробности: https://docs.python.org/3.7/library/subprocess.html#subprocess .check_output

Если код возврата был ненулевой он вызывает CalledProcessError. У объекта CalledProcessError будет код возврата в атрибуте returncode и любые выходные данные в атрибуте output.

0 голосов
/ 23 января 2020

Вы можете использовать опцию -e bcp, чтобы записать любые ошибки в файл, а затем использовать содержимое файла, чтобы принять решение об успешном выполнении задачи.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...