итерация над пандой df параллельно - PullRequest
0 голосов
/ 30 ноября 2018

Итак, я хочу перебирать pandas df параллельно, поэтому предположим, что у меня 15 строк, и я хочу перебирать его параллельно, а не по одной.

df: -

df = pd.DataFrame.from_records([
    {'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'blhp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'rbswp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'foxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'rbsxbp','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'dnd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' },
    {'domain':'hrpd','duration':'90','media_file':'testfont.wav','user':'tester_food','channel':'confctl-2' }

])

enter image description here

Итак, я перебираю df и создаю командную строку, а затем сохраняю выходные данные в df и выполняю фильтрацию данных, а затем, наконец, сохраняю их в influenxdb,Проблема в том, что я делаю это один за другим, поскольку я повторяю это.то, что я хочу перебрать во всех строках параллельно.

На данный момент я создал 20 сценариев и использую многопроцессорную обработку для параллельного выполнения всех сценариев.Это боль, когда я должен сделать изменение, как я должен сделать это во всех 20 сценариях.Мой скрипт выглядит следующим образом: -

for index, row in dff.iterrows():
    domain = row['domain']
    duration = str(row['duration'])
    media_file = row['media_file']
    user = row['user']
    channel = row['channel']
    cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + 
    duration + ' -f ' + media_file + ' -u .' + user + '. -c 
    sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- 
    test'

    rows = [shlex.split(line) for line in os.popen(
    cmda).read().splitlines() if line.strip()]

    df = pd.DataFrame(rows)
    """
    Bunch of data filteration and pushing it into influx 
    """

На данный момент у меня 15 скриптов, если я собираю 15 строк в df и выполняю параллельную обработку, как показано ниже: -

import os
import time
from multiprocessing import Process
os.chdir('/Users/akumar/vivox-sdk-4.9.0002.30719.ebb523a9')
def run_program(cmd):
    # Function that processes will run
    os.system(cmd)

# Creating command to run
commands = ['python testv.py']
commands.extend(['python testv{}.py'.format(i) for i in range(1, 15)])

# Amount of times your programs will run
runs = 1

for run in range(runs):
    # Initiating Processes with desired arguments
    running_programs = []
    for command in commands:
        running_programs.append(Process(target=run_program, args=(command,)))
        running_programs[-1].daemon = True

    # Start our processes simultaneously
    for program in running_programs:
        program.start()

    # Wait untill all programs are done
    while any(program.is_alive() for program in running_programs):
        time.sleep(1)

Вопрос: - Как я могу перебрать df и сделать так, чтобы все 15 строк работали параллельно, и выполнять всю работу внутри цикла for.

Ответы [ 2 ]

0 голосов
/ 25 декабря 2018

Я собираюсь скопировать и вставить свой ответ из Reddit здесь (на случай, если кто-нибудь наткнется на него в подобной ситуации):

import dask.dataframe as ddf

def your_function(row):
    domain = row['domain']
    duration = str(row['duration'])
    media_file = row['media_file']
    user = row['user']
    channel = row['channel']
    cmda = './vaa -s https://' + domain + '.www.vivox.com/api2/ -d ' + 
    duration + ' -f ' + media_file + ' -u .' + user + '. -c 
        sip:confctl-2@' + domain + '.localhost.com -ati 0ps-host -atk 0ps- test'

    rows = [shlex.split(line) for line in os.popen(
            cmda).read().splitlines() if line.strip()]

df_dask = ddf.from_pandas(df, npartitions=4)   # where the number of partitions is the number of cores you want to use
df_dask['output'] = df_dask.apply(lambda x: your_function(x), meta=('str')).compute(scheduler='multiprocessing')

Возможно, вам придется поиграться с параметром оси вapply метод.

0 голосов
/ 22 декабря 2018

Вместо запуска 15 процессов используйте многопоточность и вызовите многопоточную функцию с аргументом.threading.Thread(target=func, args=(i,)) где i - ваш номер, а func - функция, которая оборачивает весь код.Затем переберите его.Вам не нужно распараллеливать итерацию на 15 элементах.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...