Как сделать команду bash интерактивной в Airflow с помощью печати сообщений отладки? - PullRequest
0 голосов
/ 20 декабря 2018

Я запускаю команду bash в Airflow, включив несколько print() на нескольких этапах, чтобы гарантировать, сколько из них было выполнено.

Однако оператор print () не выполняется.

## added a script in the process just for test
import sys
import pandas as pd

cleanup_consent_column = "rwJIedeRwS"
omc_master_header = [u'PPAC District Code', u'State Name', u'District Name', u'Distributor Code', u'OMC Name', u'Distributor Contact No', u'Distributor Name', u'Distributor Address', u'SO Name', u'SO Contact', u'SALES AREA CODE', u'Email', u'DNO Name', u'DNO Contact', u'Lat_Mixed', u'Long_Mixed']

#OMC_DISTRIBUTOR_MASTER = "/mnt/data/NFS/TeamData/Multiple/external/mopng/5Feb18_master_ujjwala_latlong_dist_dno_so_v7.csv"
#PPAC_MASTER = "/mnt/data/NFS/TeamData/Multiple/external/mopng/ppac_master_v3_mmi_enriched_with_sanity_check.csv"

def clean(input_filepath, OMC_DISTRIBUTOR_MASTER, PPAC_MASTER, output_filepath):
    print("Taylor Swift's clean.")
    df = pd.read_csv(input_filepath, encoding='utf-8', dtype=object)
    print ('length of input - {0} - num cols - {1}'.format(len(df), len(df.columns.tolist())))
    ## cleanup consent column
    for x in df.columns.tolist():
        if x.startswith("rwJIedeRwS"):
            del df[x]
            break
    ## strip ppac code from the baseline
    df['consumer_id_name_ppac_code'] = df['consumer_id_name_ppac_code'].str.strip()

    ## merge with entity to get entity_ids
    omc_distributor_master = pd.read_csv(OMC_DISTRIBUTOR_MASTER, dtype=object, usecols=omc_master_header)
    omc_distributor_master = omc_distributor_master.add_prefix("omc_dist_master_")
    df = pd.merge(
        df, omc_distributor_master, how='left',
        left_on=['consumer_id_name_distributor_code', 'consumer_id_name_omc_name'],
        right_on=['omc_dist_master_Distributor Code', 'omc_dist_master_OMC Name']
    )

    ## log if anything not found
    print ('responses without distributor enrichment - {0}'.format(len(df[df['omc_dist_master_Distributor Code'].isnull()])))
    print ('num distributors without enrichment - {0}'.format(
        len(pd.unique(df[df['omc_dist_master_Distributor Code'].isnull()]['consumer_id_name_distributor_code']))
    ))

    ## converting date column
    df['consumer_id_name_sv_date'] = pd.to_datetime(df['consumer_id_name_sv_date'], format="%d/%m/%Y")
    df['consumer_id_name_sv_date'] = df['consumer_id_name_sv_date'].dt.strftime("%Y-%m-%d")

    ## add eventual_ppac_code
    print ("generating eventual ppac code column")
    count_de_rows = 0
    for i, row in df.iterrows():
        count_de_rows += 1
        if count_de_rows == 20:
            print("Another 20 done!")
            count_de_rows = 0
        ## if not found in master - use baseline data else go with omc master
        if row['omc_dist_master_PPAC District Code'] != row['omc_dist_master_PPAC District Code']:
            df.ix[i, 'eventual_ppac_code'] = row['consumer_id_name_ppac_code']
        else:
            df.ix[i, 'eventual_ppac_code'] = row['omc_dist_master_PPAC District Code']

    print("I guess it's all alright!")


if __name__ == '__main__':
    print("The main function has been called!")
    clean(sys.argv[1], sys.argv[2], sys.argv[3], sys.argv[4])

Приведенный выше скрипт запускается с помощью этой команды: Running command: python3 /usr/local/airflow/rootfs/mopng_beneficiary_v2/scripts/pclean_phlc9h6grzqdhm6sc0zrxjne_UdOgg.py /usr/local/airflow/rootfs/mopng_beneficiary_v2/manual__2018-12-18T12:06:14+00:00/appended/euoEQHIwIQTe1wXtg46fFYok.csv /usr/local/airflow/rootfs/mopng_beneficiary_v2/external/5Feb18_master_ujjwala_latlong_dist_dno_so_v7.csv /usr/local/airflow/rootfs/mopng_beneficiary_v2/external/ppac_master_v3_mmi_enriched_with_sanity_check.csv /usr/local/airflow/rootfs/mopng_beneficiary_v2/manual__2018-12-18T12:06:14+00:00/pcleaned/Qc01sB1s1WBLhljjIzt2S0Ex.csv

Да, файлы нужно было загрузить, но из журналов соответствующего узла я проверил, что всереквизиты были выполнены.

Также обратите внимание, что я уже проверил успешное завершение кода, напечатав перед частью iterrows().Итак, моя цель печати сообщений отладки состоит в том, чтобы узнать, где код испортился, потому что более 3 часов код выполнялся без завершения (после iterrows ())

Это также должно бытьотметил, что один и тот же код в Python2 на другом веб-сервере воздушного потока занимает максимум 10 минут.

...