Загрузка разных фреймов данных в несколько потоков одновременно - PullRequest
0 голосов
/ 08 февраля 2019

У меня есть флеш-сервер, который выполняет запросы чтения и записи для фреймов данных.У меня есть механизм кэширования (использующий библиотеку кеширования) для кэширования фреймов данных при получении запроса, а затем с использованием фрейма кэшированных данных при получении запроса на тот же фрейм данных.

В настоящее время я использую блокировку, которая делаетвсе потоки загружают свой (разные) фреймы данных последовательно, а затем обрабатывают загруженный фрейм данных далее.

Мне бы хотелось, чтобы при получении нескольких запросов на разные фреймы данных каждый поток (для каждого запроса) загружалсяфрейм данных (используя pandas.read_excel) одновременно в память, а не последовательно.

В настоящее время я использую простую блокировку, которая гарантирует, что один и тот же фрейм данных не загружается дважды, но мне нужно загрузить несколько фреймов данныхтакже параллельно.

`def read_query_request (query, file_path, sheet_name, source_id): logger.info ('Обработка запроса чтения для источника' + sheet_name + '_' + source_id)

try:
    data_frame_identifier = sheet_name + '_' + source_id

    # Load df with lock ensuring data frame loads only once.
    with lock:
        start_l=time.time()
        load_data_frame(file_path, sheet_name, source_id)
        end_l=time.time()
        logger.info('BENCHMARKING INFO: Read Request, Data frame load time ---' + str(end_l - start_l))

    #cache_state()
    # Executing query on loaded data frame
    # sheetName = getSheetName( query )
    query = query.replace('dataframe', data_frame_identifier)
    start_e = time.time()
    queryResult = ps.sqldf(query)
    end_e = time.time()
    logger.info('BENCHMARKING INFO: Read Request, psql query execution time ---' + str(end_e - start_e))

    start_j = time.time()
    queryResult = queryResult.to_json(orient='records')
    res = {"isErrored":"False", "results": json.loads(queryResult)}
    result = json.dumps(res)
    end_j = time.time()
    logger.info('BENCHMARKING INFO: Read Request, json conversion time ---' + str(end_j - start_j))

    logger.info(LRU_cache.keys())
    return result`

1 Ответ

0 голосов
/ 08 февраля 2019

Как я понимаю из вашего кода, вы используете одну блокировку для всего вашего приложения, которая ограничивает только один кадр данных, обрабатывающий время, и вы хотели бы обрабатывать несколько параллельно.Для начала потоки в Python (из-за GIL ) не могут быть запущены параллельно и будут выполняться в последовательности.Так что если вы хотите параллельное выполнение, вам понадобится мультиобработка.Самым простым для реализации является использование многопроцессорного пула в stdlib .Но вам все еще нужна синхронизация, чтобы избежать обработки более одного df за раз.Для этого вы можете вести реестр того, что в данный момент обрабатывается df:

...
registry_change_lock = Lock()
registry = set()  # you can use list, but search in list is O(n) and in set is O(1)
while True:
    registry_change_lock.acquire()
    if source_id in registry:
        # This id is already processed, release the lock 
        # to let other threads register their ids and avoid 
        # deadlocks
        registry_change_lock.release()
        time.sleep(0.5)
        continue
    else:
        registry.add(source_id)
        registry_change_lock.release()

PS Это не единственный способ решить вашу проблему, но один из самых простых.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...