Многопроцессорность внутри Flask для чтения файла Excel - PullRequest
0 голосов
/ 27 мая 2020

Я работал с модулем python, который читает файл Excel по частям, это собрание идей, которые я нашел в Интернете. Этот процесс использует python multiprocessing bult-in, openpyxl и numpy. Все было хорошо, пока мне не пришлось поместить свой код в Flask как микросервис. На самом деле я не знаю лучшего способа, как это сделать в Flask.

Кажется, что процесс не может разветвлять процесс в конце.

Я ценю любую помощь, чтобы сделать эта работа.

Я работаю с Python 2.7 и:

  • Flask == 1.1.1
  • mod-wsgi == 4.6.7
  • numpy == 1.16.4
  • openpyxl == 2.6.3
  • Werkzeug == 0.15.4

Это мой исходный код:

from multiprocessing import Pool, cpu_count
from openpyxl import load_workbook
from functools import wraps
import numpy as np
import traceback
import logging


logging.basicConfig(
        format='%(asctime)s %(levelname)-8s %(message)s',
        level=logging.INFO,
        datefmt='%Y-%m-%d %H:%M:%S')


def unpack(func):
    @wraps(func)
    def wrapper(arg_tuple):
        return func(*arg_tuple)
    return wrapper


def initialize():
    global cached_excels
    cached_excels = {}


def get_sheet_names(workbook):
    return workbook.sheetnames


def delete_mapping(excel_header):
    return list(map(lambda h: len(h) != 0, excel_header))


def extract_values_map(excel_range, row_range, mapping_column):
    min_row, max_row = row_range
    return np.column_stack([    np.array(range(min_row, max_row + 1)), 
                                np.array( [[cell.value for map_, cell in zip(mapping_column, row) if map_] for row in excel_range], dtype=np.unicode_  )   ])

@unpack
def extracting_data(request, extract_func):
    result = (False, "")
    try:
        if request == "EXIT":
            for value in cached_excels.items():
                wb = value
                wb.close()
            result = (True, "")
        else:
            (excel_path, excel_sheet, excel_range, excel_column, excel_mapping) = request
            if excel_path not in cached_excels:
                try:
                    wb = load_workbook(filename=excel_path, read_only=True, data_only=True)
                    cached_excels[excel_path] = wb
                except:
                    wb.close()
                    raise
            if excel_sheet not in get_sheet_names(cached_excels[excel_path]):
                raise ValueError("Worsheet not found: {0}".format(excel_sheet))
            rows = (row for row in cached_excels[excel_path][excel_sheet].iter_rows(min_row=excel_range[0], max_row=excel_range[1], min_col=excel_column[0], max_col=excel_column[1], values_only=False))
            result = (True, extract_func(rows, excel_range, excel_mapping))
    except Exception:
        result = (False, traceback.format_exc())
    return result


def define_requests(excel_path, excel_sheet, chunks, excel_column, excel_mapping):
    return list(map(lambda chunk: (excel_path, excel_sheet, chunk, excel_column, excel_mapping), chunks))


def request_data_mapping(requests, function):
    return list(map(lambda request: (request, function), requests))


def get_cores():
    return int(cpu_count())


def calculate_chunks(first_row, last_row, cores):
    i = first_row
    f = last_row
    b = int((f - i)/cores) if int((f - i)/cores) % 2 == 0 else int((f - i)/cores) - 1 #chunks
    p = b if b < 100000 else 100000 #chunk limit
    return [(x, x + p - 1 if x + p - 1 < f else f) if x < f else (x, f) if x == f else (x, f - 1) for x in range(i, f + 1, p)]


def start_pool(requests, extract_func):
    try:
        cores = get_cores()
        logging.info("Start pool")
        pool = Pool(cores, initialize, ())
        data_mapping = request_data_mapping(requests, extract_func)
        logging.info("Begin processing")
        results = pool.map(extracting_data, iterable=data_mapping)
        logging.info("End processing")
    except Exception as ex:
        logging.error(traceback.format_exc())
    finally:
        if pool:
            pool.map(extracting_data, iterable=[("EXIT", None) for i in range(0, cores)])            
            pool.close()
#            pool.join()
#            pool.terminate()
    return results


def do_compilation(range_, header, results):
    cores = get_cores()
    first_row, last_row, total_column = range_

    npf = np.empty(((last_row - first_row) + 1, total_column + 1), dtype=object)
    lst_chunks = calculate_chunks((first_row - first_row), (last_row - first_row), cores)

    for i, result in enumerate(results):
        if result[0]:
            npf[lst_chunks[i][0]:lst_chunks[i][1] + 1] = result[1]

    return make_dict(npf, header)


def make_dict(npf, header):
    return dict(zip(list(filter(lambda h: h != '', ['id_row'] + header)), map(list, zip(*npf))))


def multi_read(workbook, worksheet, range_, header, action):
    cores = get_cores()
    first_row, last_row, first_column, last_column = range_
    extract_func = {"withdropcolumn": extract_values_map}[action]
    mapping = delete_mapping(header)
    chunk_list = calculate_chunks(first_row, last_row, cores)
    requests = define_requests(workbook, worksheet, chunk_list, (first_column, last_column), mapping)
    results = start_pool(requests, extract_func)
    total_column = sum(delete_mapping(header))
    excel_range = (first_row, last_row, total_column)

    return do_compilation(excel_range, header, results)


if __name__ == "__main__":
    logging.info("Begin...")

    '''Input'''
    workbook_       = r'C:\Users\asrc\Desktop\5000 Sales Records.xlsx'
    worksheet_      = r'SaleRecords'
    first_row_      = 2
    last_row_       = 10
    first_column_   = 1
    last_column_    = 14
    header_         = ['Region','Country','','Sales Channel','','','Order ID','','','','','','Total Cost','Total Profit']
    action_         = "withdropcolumn"
    '''End Input'''

    excel_range = (first_row_, last_row_, first_column_, last_column_)

    df = multi_read(workbook_, worksheet_, excel_range, header_, action_)   

    logging.info("Keys...{0}".format(df.keys()))
    logging.info("First row...{0}".format(df['id_row'][0]))
    logging.info("Last  row...{0}".format(df['id_row'][-1]))

    logging.info("Done...")

Я поместил этот код в класс, определил все функции как класс метода с именем IOProcess и прокомментировал блок if __name__ == "__main__":, потому что эти параметры поступают вместе с остальным вызовом в @app.route("/process_io") . Внутри IOProcess у меня есть метод под названием proc, который запускает этот процесс с параметрами, переданными __init__, и запускает процесс multi_read методом.

from flask import Flask, request, jsonify
import process_io as io


app = Flask(__name__)

@app.route("/")
def faq():
    return "LOCAL IO API =)"


@app.route("/process_io", methods = ["GET", "POST"])
def start():

    data = request.get_json()

    des_nam_wb = data['DES_NM_ARQ']
    des_nam_ws = data['DES_NM_ABA']
    des_ft_row = data['LST_DE_CAMPOS_ARQ']
    des_lt_row = data['VLR_LNH_CAB']
    des_ft_col = data['FLG_LNH_AB_MESCL']
    des_lt_col = data['VLR_LNH_FINAL']
    des_header = data['DES_COL_FINAL']
    des_action = data['DES_COL_FINAL']


    io_proc = io.IOProcess(
        workbook_ = des_nam_wb,
        worksheet_ = des_nam_ws,
        first_row_ = des_ft_row,
        last_row_ = des_lt_row,
        first_column_ = des_ft_col,
        last_column_ = des_lt_col,
        header_ = des_header,
        action_ = des_action
    )

    io_proc.proc()

    dict_out = io_proc.data

    value_return = {
        "row_count": str(dict_out['row_count']),
        "col_count": str(dict_out['col_count']),
        "status": str(dict_out['status'])
    }

    return jsonify(value_return)


if __name__ == "__main__": 
    app.run()

Я знаю, что это кажется немного сумасшедшим, но, как я уже сказал раньше, я ценю любую помощь в этой работе.

Файл excel в этом примере можно загрузить здесь:

http://eforexcel.com/wp/wp-content/uploads/2017/07/5000-Sales-Records.zip

Спасибо !!!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...