Выполнение многопоточности с pandas фреймом данных в python - PullRequest
0 голосов
/ 16 апреля 2020

У меня есть pandas df, из которого я делаю командную строку из каждого ряда и запускаю их параллельно, используя os.system(). Вы заметите, что я жестко запрограммировал мой cores = 64, потому что в моей системе всего 14 ядер, а в CSV у меня 63 строки, а поскольку ядер всего 14, то запускается только 14 из них, поэтому я передал ядро ​​как 64. Теперь Я должен увеличить количество процессов примерно до 150. Поскольку процессы действительно малы, я хочу переключиться на многопоточность и добиться того же.

В основном я хочу выполнить следующее: -

1.) Чтение строк из файла CSV или любого источника данных в pandas или даже в словарь.

2.) Создание командной строки из всех строк.

3 .) Запуск всех командных строк параллельно с использованием multithreading.

Самый важный вопрос: - Сколько потоков я могу \ использовать за один раз.

def processor(dff):
    for index,row in dff.iterrows():
            dir = str(row['dir'])
            script = str(row['script'])
            c = str(row['c'])
            o = str(row['o'])
            mph5xx = str(row['mph5xx'])
            check = str(row['check'])
            ser = str(row['ser'])
            www = str(row['www'])
            influx_host = str(row['influx_host'])
            es_host = str(row['es_host'])
            xymon = str(row['xymon'])
            uri = str(row['uri'])
            yellow = str(row['yellow'])
            red = str(row['red'])
            interval = str(row['interval'])
            window = str(row['window'])
            anomalous = str(row['anomalous'])
            no_page = str(row['no_page'])
            nopage_auto = str(row['nopage_auto'])
            cmda = dir + '/' + script + ' ' + '-c' + ' ' + c + ' ' + '-o' + ' ' + o +  ' '  + '--check' + ' '+ check + ' ' + '--mph5xx' + ' ' + mph5xx + ' ' + '--ser' + ' ' + ser + ' ' + '--www' + ' ' + www + ' ' +'--influx_host' + ' ' + influx_host + ' ' + '--es_host' + ' ' + es_host  + ' ' + '--xymon' + ' ' + xymon +  ' ' + '--yellow' + ' ' + yellow + ' ' + '--red' + ' ' + red + ' ' + '--interval' + ' ' + interval + ' ' + '--window'+ ' ' + window + ' ' + '--anomalous' + ' ' + anomalous + ' '+ '--no_page' + ' ' + no_page + ' '+ '--nopage_auto' + ' ' + nopage_auto

            os.system(cmda)


def main(infile, mdebug):

    global debug
    debug = mdebug

    try:
        lines = sum(1 for line in open(infile))
    except Exception as err:
        print "Error {} opening file: {}".format(err, infile)
        sys.exit(2000)

    if debug >= 2:
        print(infile)

    try:
        dff = pd.read_csv(infile)
    except Exception as err:
        print "Error {}, opening file: {}".format(err, infile)
            sys.exit(2000)


    df_split = np.array_split(dff, (lines+1))
    cores = multiprocessing.cpu_count()
    cores = 64
    if debug >= 1:
            print "RCM runs is: {}".format(lines - 1)
            print "cores is: {}".format(cores)

        if debug >= 5:
            sys.exit(2000)

        # pool = Pool(cores)
        pool = Pool(lines-1)

    if debug >= 5:
        sys.exit(2000)

    for n, frame in enumerate(pool.imap(processor, df_split), start=1):
            if frame is not None:
                frame.to_csv('{}'.format(n))

    pool.close()
    pool.join()

if __name__ == "__main__":
        args = parse_args()
...