Следующий код предназначен для выполнения списка функций над таблицей. Точно так же, как конвейер. Список передается в виде массива OrderedDicts. OrderedDict имеет не только список вызываемых функций, но и аргументы для каждой функции. Функции все о передаче данных CSV. Это обертка библиотеки панд. Проблема в том, что каждый раз, когда я вызываю обработчик (имитирующий параллельные задания), функции выводят свое сообщение еще раз, чем предыдущий вызов обработчика ().
from collections import OrderedDict
from datetime import datetime
import logging, time, asyncio
#import configargparse
from funciones import file, csv_tools
p = [
OrderedDict([
['file.read_csv', {
'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_25000.csv',
"header": 0,
'sep': ', ',
}],
['csv_tools.show_column_names', {}],
['csv_tools.show_data_size', {}],
]),
OrderedDict([
['file.read_csv', {
'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv',
"header": 0,
'sep': ', ',
}],
['csv_tools.show_column_names', {}],
['csv_tools.show_data_size', {}],
]),
OrderedDict([
['file.read_csv', {
'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv',
"header": 0,
'sep': ', ',
}],
['csv_tools.show_column_names', {}],
['csv_tools.show_data_size', {}],
])]
LOG_LEVEL=logging.DEBUG
def set_logger(task_id):
logger = logging.getLogger(__name__)
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
logger.addHandler(handler)
formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(task_id)s %(filename)s:%(lineno)s %(funcName)10s(): %(message)s")
handler.setFormatter(formatter)
logger = logging.LoggerAdapter(logger, {'task_id': task_id})
return logger
async def handler(task_id, params):
try:
#TODO montar mecanismo de subscripcion a eventos para comenzar
logger = set_logger(task_id)
logger.debug(f"parametros recibidos: {params}")
s = time.perf_counter()
buffer = None
for func in params:
logger.debug("executing: {}".format(func))
f = eval(func)
buffer = await f(buffer, params[func], logger)
elapsed = time.perf_counter() - s
print(f"Fin: ({elapsed:0.2f} secs)")
#TODO lanzar evento de fin
except Exception as ex:
print("Se ha producido un fallo:")
#TODO lanzar evento de fallo
raise ex
async def main(p):
await asyncio.gather(
handler(1000, p[0]),
handler(20000, p[1]),
handler(30000, p[2]))
asyncio.run(main(p))
При выполнении показывает инкрементное количество запросов следующим образом:
2019-03-31 17:35:17,437 [DEBUG] 1000 try.py:64 handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_25000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:17,437 [DEBUG] 1000 try.py:68 handler(): executing: file.read_csv
2019-03-31 17:35:20,702 [DEBUG] 1000 try.py:68 handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:20,703 [INFO] 1000 csv_tools.py:14 show_column_names(): ['"Index"' '"Height(Inches)"' '"Weight(Pounds)"']
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:64 handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:64 handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:68 handler(): executing: file.read_csv
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:68 handler(): executing: file.read_csv
2019-03-31 17:35:21,218 [DEBUG] 20000 try.py:68 handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:21,218 [DEBUG] 20000 try.py:68 handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:21,218 [INFO] 20000 csv_tools.py:14 show_column_names(): ['"Index"' 'Height(Inches)"' '"Weight(Pounds)"']
2019-03-31 17:35:21,218 [INFO] 20000 csv_tools.py:14 show_column_names(): ['"Index"' 'Height(Inches)"' '"Weight(Pounds)"']
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:64 handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:64 handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:64 handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:68 handler(): executing: file.read_csv
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:68 handler(): executing: file.read_csv
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:68 handler(): executing: file.read_csv
2019-03-31 17:35:22,570 [DEBUG] 30000 try.py:68 handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:22,570 [DEBUG] 30000 try.py:68 handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:22,570 [DEBUG] 30000 try.py:68 handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:22,570 [INFO] 30000 csv_tools.py:14 show_column_names(): ['"Game Number"' '"Game Length"']
2019-03-31 17:35:22,570 [INFO] 30000 csv_tools.py:14 show_column_names(): ['"Game Number"' '"Game Length"']
2019-03-31 17:35:22,570 [INFO] 30000 csv_tools.py:14 show_column_names(): ['"Game Number"' '"Game Length"']
2019-03-31 17:35:30,711 [DEBUG] 1000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:30,711 [DEBUG] 1000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:30,711 [DEBUG] 1000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:30,712 [INFO] 1000 csv_tools.py:37 show_data_size(): Filas: 25000, columnas: 3
2019-03-31 17:35:30,712 [INFO] 1000 csv_tools.py:37 show_data_size(): Filas: 25000, columnas: 3
2019-03-31 17:35:30,712 [INFO] 1000 csv_tools.py:37 show_data_size(): Filas: 25000, columnas: 3
Fin: (13.28 secs)
2019-03-31 17:35:31,220 [DEBUG] 20000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:31,220 [DEBUG] 20000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:31,220 [DEBUG] 20000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:31,221 [INFO] 20000 csv_tools.py:37 show_data_size(): Filas: 200, columnas: 3
2019-03-31 17:35:31,221 [INFO] 20000 csv_tools.py:37 show_data_size(): Filas: 200, columnas: 3
2019-03-31 17:35:31,221 [INFO] 20000 csv_tools.py:37 show_data_size(): Filas: 200, columnas: 3
Fin: (10.52 secs)
2019-03-31 17:35:32,573 [DEBUG] 30000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:32,573 [DEBUG] 30000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:32,573 [DEBUG] 30000 try.py:68 handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:32,574 [INFO] 30000 csv_tools.py:37 show_data_size(): Filas: 10000, columnas: 2
2019-03-31 17:35:32,574 [INFO] 30000 csv_tools.py:37 show_data_size(): Filas: 10000, columnas: 2
2019-03-31 17:35:32,574 [INFO] 30000 csv_tools.py:37 show_data_size(): Filas: 10000, columnas: 2
Fin: (11.35 secs)
Каждый раз, когда я вызываю функцию-обработчик (), она печатает еще раз свое сообщение. Что я делаю не так, пожалуйста?