Обработка изменяемых общих объектов в асинхронном / многопоточном режиме.Альтернатива '.copy ()'? - PullRequest
0 голосов
/ 19 февраля 2019

Я пишу программу, которая создает группу рабочих для асинхронного вызова API с использованием aiohttp.Однако этот вопрос касается shared-объектов.Я предполагаю, что столкнулся бы с той же или подобной проблемой, если бы я был многопоточным.

У меня есть набор параметров URL-адресов по умолчанию, которые разделяют все работники, однако два значения этих параметров меняются от рабочего к рабочему:

DEFAULT_PARAMS = {
    'q' : None,                         #<==CHANGES per worker
    'offset' : '0',                     #<==CHANGES per worker
    'mkt' : 'en-US',                    #<==STATIC for all workers
    'moreParams' : '<most of the data>' #<==STATIC for all workers
}  

Вот как я инициализирую свойWorker() class:

class Worker(object):
    def __init__(self, q):
        # this copy iexpensive when > 100 workers.
        self.initial_params = DEFAULT_PARAMS.copy()
        # but witout copying entire default params dict, the next line
        # would add alter the 'q' value for all instances of Worker.
        self.initial_params.update({'q' : q})

Я ищу альтернативу вызову DEFAULT_PARAMS.copy() для каждого нового работника, которого я создаю.

Выяснить, как поставить этот вопрос, было непросто,Я подозреваю, что мой ответ может лежать где-то в классе через атрибуты экземпляра.

Вот чрезвычайно простой пример моей программы:

import aiohttp
import asyncio

DEFUALT_PARAMS = {
    'q' : None, #<==CHANGES per worker
    'offset' : '0', #<==CHANGES per worker
    'mkt' : 'en-US', #<==STATIC for all workers
    'moreParams' : '<most of the data>' #<==STATIC for all workers
}

class Worker(object):
    def __init__(self, q):
        self.initial_params = DEFUALT_PARAMS.copy() # <==expensive
        self.initial_params.update({'q' : q}) #<==without copying, overwrites ref for all classes.

    async def call_api(self):
        async with aiohttp.ClientSession() as sesh:
            async with sesh.get(
                'https://somesearchengine.com/search?',
                params=self.initial_params
            ) as resp:
                assert resp.status == 200
                print(await resp.json())


async def main(workers, *, loop=None):
    tasks = (asyncio.ensure_future(i.call_api(), loop=loop) for i in workers)
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queries = ['foo', 'bar', 'baz']
    workers = (Worker(i) for i in queries)
    loop.run_until_complete(main(workers, loop=loop))

Ответы [ 2 ]

0 голосов
/ 19 февраля 2019

Если q принадлежит работнику, почему бы просто не сделать его переменной экземпляра в самом Worker.

class Worker(object):
    def __init__(self, q):
      self.q = q

И куда хотите q сделать self.q

0 голосов
/ 19 февраля 2019

Копирование словарей даже для 100 рабочих стоит не так дорого .Вы можете создать копию словаря из 1000 ключей и обновить его за 7 микросекунд:

>>> from timeit import Timer
>>> from secrets import token_urlsafe
>>> test_dict = {token_urlsafe(): token_urlsafe() for _ in range(1000)}
>>> len(test_dict)
1000
>>> count, total = Timer("p = d.copy(); p.update({'q' : q})", "from __main__ import test_dict as d; q = 42").autorange()
>>> print(total/count*1000000)  # microseconds are 10**-6 seconds
7.146239580000611

Так что я бы сказал, что здесь на самом деле нет проблем.

Тем не менее, вы по сути многоуровневое содержание словаря;корректировки на одного работника всего на одну или две клавиши.Вместо создания копии вы можете использовать collections.ChainMap() объект для управления наложением.Объект ChainMap() принимает более одного словаря и будет искать ключи в них, пока не будет найдено значение.Копия не создается, а самый верхний словарь используется для установки значений при изменении карты:

from collections import ChainMap

# ...
self.initial_params = ChainMap({'q': q}, DEFAULT_PARAMS)

Создание ChainMap() объектов еще дешевле:

>>> count, total = Timer("p = ChainMap({'q': q}, d)", "from __main__ import test_dict as d; q = 42; from collections import ChainMap").autorange()
>>> print(total/count*1000000)
0.5310121239999717

, поэтомутолько половина микросекунды.Конечно, это достигается за счет более медленной итерации и доступа по ключам.Это будет зависеть от того, как aiohttp обрабатывает их, я рекомендую вам сделать свои собственные микро-тесты с модулем timeit для измерения производительности реальных операций, которые выполняет ваш код.

Но обратите внимание, что естьвсегда цена, которую приходится платить при попытке обработки общего состояния подобным образом, с любой моделью параллелизма, и , разделяющей словарь между экземплярами, всегда будет проблематичной, даже без параллелизма.

...