Код Asyncio отображает добавочное количество параметров при каждом вызове - PullRequest
0 голосов
/ 31 марта 2019

Следующий код предназначен для выполнения списка функций над таблицей. Точно так же, как конвейер. Список передается в виде массива OrderedDicts. OrderedDict имеет не только список вызываемых функций, но и аргументы для каждой функции. Функции все о передаче данных CSV. Это обертка библиотеки панд. Проблема в том, что каждый раз, когда я вызываю обработчик (имитирующий параллельные задания), функции выводят свое сообщение еще раз, чем предыдущий вызов обработчика ().

from collections import OrderedDict
from datetime import datetime
import logging, time, asyncio
#import configargparse
from funciones import file, csv_tools


p = [
     OrderedDict([
        ['file.read_csv', {
                'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_25000.csv',
                "header": 0,
                'sep': ', ',
        }],
        ['csv_tools.show_column_names', {}],
        ['csv_tools.show_data_size', {}], 
    ]),
    OrderedDict([
        ['file.read_csv', {
                'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv',
                "header": 0,
                'sep': ', ',
        }],
        ['csv_tools.show_column_names', {}],
        ['csv_tools.show_data_size', {}],        
    ]),
    OrderedDict([
        ['file.read_csv', {
                'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv',
                "header": 0,
                'sep': ', ',
        }],
        ['csv_tools.show_column_names', {}],
        ['csv_tools.show_data_size', {}], 
    ])]



LOG_LEVEL=logging.DEBUG


def set_logger(task_id):
    logger = logging.getLogger(__name__)
    logger.setLevel(LOG_LEVEL)
    handler = logging.StreamHandler()
    logger.addHandler(handler)
    formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(task_id)s %(filename)s:%(lineno)s %(funcName)10s(): %(message)s")
    handler.setFormatter(formatter)
    logger = logging.LoggerAdapter(logger, {'task_id': task_id})
    return logger


async def handler(task_id, params):
    try:        
        #TODO montar mecanismo de subscripcion a eventos para comenzar
        logger = set_logger(task_id)
        logger.debug(f"parametros recibidos: {params}")
        s = time.perf_counter()
        buffer = None
        for func in params:
            logger.debug("executing: {}".format(func))
            f = eval(func)
            buffer = await f(buffer, params[func], logger)
        elapsed = time.perf_counter() - s
        print(f"Fin: ({elapsed:0.2f} secs)")
        #TODO lanzar evento de fin
    except Exception as ex:
        print("Se ha producido un fallo:")
        #TODO lanzar evento de fallo
        raise ex

async def main(p):

    await asyncio.gather(
            handler(1000, p[0]),
            handler(20000, p[1]),
            handler(30000, p[2]))


asyncio.run(main(p))

При выполнении показывает инкрементное количество запросов следующим образом:

2019-03-31 17:35:17,437 [DEBUG] 1000 try.py:64    handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_25000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:17,437 [DEBUG] 1000 try.py:68    handler(): executing: file.read_csv
2019-03-31 17:35:20,702 [DEBUG] 1000 try.py:68    handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:20,703 [INFO] 1000 csv_tools.py:14 show_column_names(): ['"Index"' '"Height(Inches)"' '"Weight(Pounds)"']
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:64    handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:64    handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/hw_200.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:68    handler(): executing: file.read_csv
2019-03-31 17:35:20,703 [DEBUG] 20000 try.py:68    handler(): executing: file.read_csv
2019-03-31 17:35:21,218 [DEBUG] 20000 try.py:68    handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:21,218 [DEBUG] 20000 try.py:68    handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:21,218 [INFO] 20000 csv_tools.py:14 show_column_names(): ['"Index"' 'Height(Inches)"' '"Weight(Pounds)"']
2019-03-31 17:35:21,218 [INFO] 20000 csv_tools.py:14 show_column_names(): ['"Index"' 'Height(Inches)"' '"Weight(Pounds)"']
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:64    handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:64    handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:64    handler(): parametros recibidos: OrderedDict([('file.read_csv', {'path': 'https://people.sc.fsu.edu/~jburkardt/data/csv/snakes_count_10000.csv', 'header': 0, 'sep': ', '}), ('csv_tools.show_column_names', {}), ('csv_tools.show_data_size', {})])
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:68    handler(): executing: file.read_csv
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:68    handler(): executing: file.read_csv
2019-03-31 17:35:21,219 [DEBUG] 30000 try.py:68    handler(): executing: file.read_csv
2019-03-31 17:35:22,570 [DEBUG] 30000 try.py:68    handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:22,570 [DEBUG] 30000 try.py:68    handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:22,570 [DEBUG] 30000 try.py:68    handler(): executing: csv_tools.show_column_names
2019-03-31 17:35:22,570 [INFO] 30000 csv_tools.py:14 show_column_names(): ['"Game Number"' '"Game Length"']
2019-03-31 17:35:22,570 [INFO] 30000 csv_tools.py:14 show_column_names(): ['"Game Number"' '"Game Length"']
2019-03-31 17:35:22,570 [INFO] 30000 csv_tools.py:14 show_column_names(): ['"Game Number"' '"Game Length"']
2019-03-31 17:35:30,711 [DEBUG] 1000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:30,711 [DEBUG] 1000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:30,711 [DEBUG] 1000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:30,712 [INFO] 1000 csv_tools.py:37 show_data_size(): Filas: 25000, columnas: 3
2019-03-31 17:35:30,712 [INFO] 1000 csv_tools.py:37 show_data_size(): Filas: 25000, columnas: 3
2019-03-31 17:35:30,712 [INFO] 1000 csv_tools.py:37 show_data_size(): Filas: 25000, columnas: 3
Fin: (13.28 secs)
2019-03-31 17:35:31,220 [DEBUG] 20000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:31,220 [DEBUG] 20000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:31,220 [DEBUG] 20000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:31,221 [INFO] 20000 csv_tools.py:37 show_data_size(): Filas: 200, columnas: 3
2019-03-31 17:35:31,221 [INFO] 20000 csv_tools.py:37 show_data_size(): Filas: 200, columnas: 3
2019-03-31 17:35:31,221 [INFO] 20000 csv_tools.py:37 show_data_size(): Filas: 200, columnas: 3
Fin: (10.52 secs)
2019-03-31 17:35:32,573 [DEBUG] 30000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:32,573 [DEBUG] 30000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:32,573 [DEBUG] 30000 try.py:68    handler(): executing: csv_tools.show_data_size
2019-03-31 17:35:32,574 [INFO] 30000 csv_tools.py:37 show_data_size(): Filas: 10000, columnas: 2
2019-03-31 17:35:32,574 [INFO] 30000 csv_tools.py:37 show_data_size(): Filas: 10000, columnas: 2
2019-03-31 17:35:32,574 [INFO] 30000 csv_tools.py:37 show_data_size(): Filas: 10000, columnas: 2
Fin: (11.35 secs)

Каждый раз, когда я вызываю функцию-обработчик (), она печатает еще раз свое сообщение. Что я делаю не так, пожалуйста?

1 Ответ

0 голосов
/ 31 марта 2019

Понял !!!

Проблема в том, что каждый раз, когда вызывается функция set_logger, он добавляет новый обработчик в регистратор.Решение смиренно.Просто измените set_logger, чтобы удалить предыдущих помощников перед добавлением нового:

def set_logger(task_id):
    logger = logging.getLogger(__name__)
    logger.setLevel(LOG_LEVEL)
    handler = logging.StreamHandler()
    for h in logger.handlers:        
        logger.removeHandler(h)
    logger.addHandler(handler)
    formatter = logging.Formatter("%(asctime)s %(levelname)7s  %(task_id)s %(filename)s:%(lineno)s %(funcName)10s(): %(message)s")
    handler.setFormatter(formatter)
    logger = logging.LoggerAdapter(logger, {'task_id': task_id})
    return logger
...