Многопроцессорная обработка через pool.map_async очень медленная для большого фрейма данных - PullRequest
1 голос
/ 16 июня 2019

Я читаю 800 000 строк в таблицу данных. Затем я зацикливаюсь на каждом столбце и каждой строке в столбце, чтобы собрать статистику, такую ​​как максимальная длина, минимальная длина, максимальное значение, различные значения и т. Д.

У меня есть доступ к 32-ядерным вычислениям с использованием SLURM, поэтому я решил использовать pool.map_async для обработки каждого столбца в кадре данных в отдельных процессах.

Это намного медленнее, чем просто использование цикла for.

Я попытался уменьшить количество процессоров до 8, 4 и т. Д., Чтобы посмотреть, вызвал ли это запуск процесса.

Я подозреваю, что это вызвано сериализацией серии панд на 800 000 строк?

import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs
import pandas as pd
import pandas.io.sql as psql


def get_data():
    print("Starting Job: " + str(datetime.datetime.now()))

    # Step 1: Init multiprocessing.Pool()   
    pool = mp.Pool(mp.cpu_count())
    print("CPU Count: " + str(mp.cpu_count()))

    dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='myservice')
    con = cx_Oracle.connect(user='ARIEL', password='zzzzz', dsn=dsn_tns)


    stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]]

    sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"

    cur = con.cursor()
    print("Start Executing SQL: " + str(datetime.datetime.now()))

    df = psql.read_sql(sql, con);

    print("End SQL Execution: " + str(datetime.datetime.now()))

    col_names = df.columns.values.tolist()
    col_index = 0


    print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
    # we go through every field

    # start at column 0
    col_index = 0

    # iterate through each column, to gather stats from each column using parallelisation
    proc_results = pool.map_async(gs.get_column_stats, df.iteritems()).get()




    # Step 3: Don't forget to close
    pool.close() 
    pool.join()


    for result in proc_results:
        stats_results.append(result)


    print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
    # end filename for
    cur.close()           

    outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w')
    writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n')
    writer.writerows(stats_results)
    outfile.close()
    print("Ending Job: " + str(datetime.datetime.now()))





get_data()

Код, называемый асинхронным:

import os
import sys

def strip_crlf(value):
    return value.replace('\n', ' ').replace('\r', '')

def get_column_stats(args):
    # args is a tuple, the first value is the column name of the panda series, the second value is the panda data series

    col_name, rs = args
    sys.stdout = open("col_" + col_name + ".out", "a")

    print("Starting Iteration of Column: " + col_name)
    max_length = 0 
    min_length = 100000  # abitrarily large number!!

    max_value = ""
    min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"  # abitrarily large number!!

    distinct_value_count = 0

    has_values = False # does the column have any non-null values
    has_null_values = False

    row_count = 0

    # create a dictionary into which we can add the individual items present in each row of data
    # a dictionary will not let us add the same value more than once, so we can simply count the 
    # dictionary values at the end
    distinct_values = {}

    row_index = 0



    # go through every row, for the current column being processed to gather the stats
    for row_value in rs.values:
        row_count += 1


        if row_value is None:
            value_length = 0
        else:
            value_length = len(str(row_value))


        if value_length > max_length:
            max_length = value_length

        if value_length < min_length:
            if value_length > 0:
                min_length = value_length

        if row_value is not None:
            if str(row_value) > max_value:
                max_value = str(row_value)
            if str(row_value) < min_value:
                min_value = str(row_value)

        # capture distinct values
        if row_value is None:
            row_value = "Null"
            has_null_values = True
        else:
            has_values = True
            distinct_values[row_value] = 1

        row_index += 1
        # end row for

    distinct_value_count = len(distinct_values)

    if has_values == False:
        distinct_value_count = None
        min_length = None
        max_length = None
        min_value = None
        max_value = None
    elif has_null_values == True and distinct_value_count > 0:
        distinct_value_count -= 1

    if min_length == 0 and max_length > 0 and has_values == True:
        min_length = max_length

    print("Ending Iteration of Column: " + col_name)


    return ["ARIEL","DIM_REGISTRATION_SET", col_name,row_count, distinct_value_count, min_length, max_length, 
                            strip_crlf(str(min_value)), strip_crlf(str(max_value))]

1 Ответ

0 голосов
/ 16 июня 2019

Вы можете сделать это быстрее, если каждый процесс запрашивает один столбец, а не запрашивает все столбцы в начале, что требует их копирования всем дочерним процессам.

Одна идея - просто запросить имена столбцов в центральном процессе, а затем отправить только имена столбцов дочерним процессам. Тогда SELECT columnX вместо SELECT *.

...