Я работал с модулем python, который читает файл Excel по частям, это собрание идей, которые я нашел в Интернете. Этот процесс использует python multiprocessing bult-in, openpyxl и numpy. Все было хорошо, пока мне не пришлось поместить свой код в Flask как микросервис. На самом деле я не знаю лучшего способа, как это сделать в Flask.
Кажется, что процесс не может разветвлять процесс в конце.
Я ценю любую помощь, чтобы сделать эта работа.
Я работаю с Python 2.7 и:
- Flask == 1.1.1
- mod-wsgi == 4.6.7
- numpy == 1.16.4
- openpyxl == 2.6.3
- Werkzeug == 0.15.4
Это мой исходный код:
from multiprocessing import Pool, cpu_count
from openpyxl import load_workbook
from functools import wraps
import numpy as np
import traceback
import logging
logging.basicConfig(
format='%(asctime)s %(levelname)-8s %(message)s',
level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S')
def unpack(func):
@wraps(func)
def wrapper(arg_tuple):
return func(*arg_tuple)
return wrapper
def initialize():
global cached_excels
cached_excels = {}
def get_sheet_names(workbook):
return workbook.sheetnames
def delete_mapping(excel_header):
return list(map(lambda h: len(h) != 0, excel_header))
def extract_values_map(excel_range, row_range, mapping_column):
min_row, max_row = row_range
return np.column_stack([ np.array(range(min_row, max_row + 1)),
np.array( [[cell.value for map_, cell in zip(mapping_column, row) if map_] for row in excel_range], dtype=np.unicode_ ) ])
@unpack
def extracting_data(request, extract_func):
result = (False, "")
try:
if request == "EXIT":
for value in cached_excels.items():
wb = value
wb.close()
result = (True, "")
else:
(excel_path, excel_sheet, excel_range, excel_column, excel_mapping) = request
if excel_path not in cached_excels:
try:
wb = load_workbook(filename=excel_path, read_only=True, data_only=True)
cached_excels[excel_path] = wb
except:
wb.close()
raise
if excel_sheet not in get_sheet_names(cached_excels[excel_path]):
raise ValueError("Worsheet not found: {0}".format(excel_sheet))
rows = (row for row in cached_excels[excel_path][excel_sheet].iter_rows(min_row=excel_range[0], max_row=excel_range[1], min_col=excel_column[0], max_col=excel_column[1], values_only=False))
result = (True, extract_func(rows, excel_range, excel_mapping))
except Exception:
result = (False, traceback.format_exc())
return result
def define_requests(excel_path, excel_sheet, chunks, excel_column, excel_mapping):
return list(map(lambda chunk: (excel_path, excel_sheet, chunk, excel_column, excel_mapping), chunks))
def request_data_mapping(requests, function):
return list(map(lambda request: (request, function), requests))
def get_cores():
return int(cpu_count())
def calculate_chunks(first_row, last_row, cores):
i = first_row
f = last_row
b = int((f - i)/cores) if int((f - i)/cores) % 2 == 0 else int((f - i)/cores) - 1 #chunks
p = b if b < 100000 else 100000 #chunk limit
return [(x, x + p - 1 if x + p - 1 < f else f) if x < f else (x, f) if x == f else (x, f - 1) for x in range(i, f + 1, p)]
def start_pool(requests, extract_func):
try:
cores = get_cores()
logging.info("Start pool")
pool = Pool(cores, initialize, ())
data_mapping = request_data_mapping(requests, extract_func)
logging.info("Begin processing")
results = pool.map(extracting_data, iterable=data_mapping)
logging.info("End processing")
except Exception as ex:
logging.error(traceback.format_exc())
finally:
if pool:
pool.map(extracting_data, iterable=[("EXIT", None) for i in range(0, cores)])
pool.close()
# pool.join()
# pool.terminate()
return results
def do_compilation(range_, header, results):
cores = get_cores()
first_row, last_row, total_column = range_
npf = np.empty(((last_row - first_row) + 1, total_column + 1), dtype=object)
lst_chunks = calculate_chunks((first_row - first_row), (last_row - first_row), cores)
for i, result in enumerate(results):
if result[0]:
npf[lst_chunks[i][0]:lst_chunks[i][1] + 1] = result[1]
return make_dict(npf, header)
def make_dict(npf, header):
return dict(zip(list(filter(lambda h: h != '', ['id_row'] + header)), map(list, zip(*npf))))
def multi_read(workbook, worksheet, range_, header, action):
cores = get_cores()
first_row, last_row, first_column, last_column = range_
extract_func = {"withdropcolumn": extract_values_map}[action]
mapping = delete_mapping(header)
chunk_list = calculate_chunks(first_row, last_row, cores)
requests = define_requests(workbook, worksheet, chunk_list, (first_column, last_column), mapping)
results = start_pool(requests, extract_func)
total_column = sum(delete_mapping(header))
excel_range = (first_row, last_row, total_column)
return do_compilation(excel_range, header, results)
if __name__ == "__main__":
logging.info("Begin...")
'''Input'''
workbook_ = r'C:\Users\asrc\Desktop\5000 Sales Records.xlsx'
worksheet_ = r'SaleRecords'
first_row_ = 2
last_row_ = 10
first_column_ = 1
last_column_ = 14
header_ = ['Region','Country','','Sales Channel','','','Order ID','','','','','','Total Cost','Total Profit']
action_ = "withdropcolumn"
'''End Input'''
excel_range = (first_row_, last_row_, first_column_, last_column_)
df = multi_read(workbook_, worksheet_, excel_range, header_, action_)
logging.info("Keys...{0}".format(df.keys()))
logging.info("First row...{0}".format(df['id_row'][0]))
logging.info("Last row...{0}".format(df['id_row'][-1]))
logging.info("Done...")
Я поместил этот код в класс, определил все функции как класс метода с именем IOProcess
и прокомментировал блок if __name__ == "__main__":
, потому что эти параметры поступают вместе с остальным вызовом в @app.route("/process_io")
. Внутри IOProcess
у меня есть метод под названием proc
, который запускает этот процесс с параметрами, переданными __init__
, и запускает процесс multi_read
методом.
from flask import Flask, request, jsonify
import process_io as io
app = Flask(__name__)
@app.route("/")
def faq():
return "LOCAL IO API =)"
@app.route("/process_io", methods = ["GET", "POST"])
def start():
data = request.get_json()
des_nam_wb = data['DES_NM_ARQ']
des_nam_ws = data['DES_NM_ABA']
des_ft_row = data['LST_DE_CAMPOS_ARQ']
des_lt_row = data['VLR_LNH_CAB']
des_ft_col = data['FLG_LNH_AB_MESCL']
des_lt_col = data['VLR_LNH_FINAL']
des_header = data['DES_COL_FINAL']
des_action = data['DES_COL_FINAL']
io_proc = io.IOProcess(
workbook_ = des_nam_wb,
worksheet_ = des_nam_ws,
first_row_ = des_ft_row,
last_row_ = des_lt_row,
first_column_ = des_ft_col,
last_column_ = des_lt_col,
header_ = des_header,
action_ = des_action
)
io_proc.proc()
dict_out = io_proc.data
value_return = {
"row_count": str(dict_out['row_count']),
"col_count": str(dict_out['col_count']),
"status": str(dict_out['status'])
}
return jsonify(value_return)
if __name__ == "__main__":
app.run()
Я знаю, что это кажется немного сумасшедшим, но, как я уже сказал раньше, я ценю любую помощь в этой работе.
Файл excel в этом примере можно загрузить здесь:
http://eforexcel.com/wp/wp-content/uploads/2017/07/5000-Sales-Records.zip
Спасибо !!!