Как правильно использовать asyncio в потоке с несколькими производителями и потребителями, который включает запись в файл или файл .gzip? - PullRequest
1 голос
/ 18 января 2020

Я реализую модуль python, который принимает кортеж с тремя списками (x, y, val) и выполняет их выборку в соответствии с заданным соотношением. Я делаю это правильно?

  1. Пишу ли я на диск асинхронно?
  2. Могу ли я иметь многих производителей и потребителей, чтобы все они генерировали и записывали данные в один и тот же выходной файл ?
  3. Когда я сравниваю этот код с наивной реализацией отдельного потока, они выполняют аналогичные действия в отношении времени выполнения.

import bisect
import numpy as np
import gzip
import asyncio

class SignalPDF:
    def __init__(self, inputSignal):
        self.x         = inputSignal[0][:]
        self.y         = inputSignal[1][:]
        self.vals      = inputSignal[2][:]
        self.valCumsum = np.cumsum(self.vals)
        self.totalSum  = np.sum(self.vals)
        self.N         = len(self.vals)

class SignalSampler:
    def __init__(self, inputSignal, ratio=1.0):
        self.signalPDF = SignalPDF(inputSignal)
        self.Q         = asyncio.Queue()
        self.ratio     = float(ratio)
        self.N         = int(self.signalPDF.N/self.ratio)
        self.sampledN  = 0

    async def randRead(self):
        while self.sampledN < self.N:
            i = np.random.randint(self.signalPDF.totalSum, size=1, dtype=np.uint64)[0]
            self.sampledN += 1 
            cell = bisect.bisect(self.signalPDF.valCumsum, i)
            yield (self.signalPDF.x[cell], self.signalPDF.y[cell], int(self.signalPDF.vals[cell]))

    async def readShortFormattedLine(self):
        async for read in self.randRead():
            x = read[0]; y = read[1]; val = read[2]; 
            yield '{0} {1} {2}'.format(x,y,val)

    async def populateQueue(self):
        async for i in self.readShortFormattedLine():
            await self.Q.put(i)
        await self.Q.put(None)

    async def hanldeGzip(self, filePath):
        with gzip.open(filePath, 'wt') as f:
            while True:
                item = await self.Q.get()
                if item is None:
                    break
                f.write('{0}\n'.format(item))
            f.flush()

    async def hanldeFile(self, filePath):
        with open(filePath, 'w+') as f:
            while True:
                item = await self.Q.get()
                if item is None:
                    break
                f.write('{0}\n'.format(item))
            f.flush()

def main(gzip, outputFile):
    x=[]; y=[];val=[]
    for i in range(100):
        for j in range(100):
            x.append(i)
            y.append(j)
            val.append(np.random.randint(0,250))

    loop      = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    mixer = SignalSampler(inputSignal=[x,y,val], ratio=2.0)
    futures = []
    if gzip:
        futures = [mixer.hanldeGzip(outputFile), mixer.populateQueue()]
    else:
        futures = [mixer.hanldeFile(outputFile), mixer.populateQueue()] 
    tasks   = asyncio.wait(futures, loop=loop)
    results = loop.run_until_complete(tasks)
    loop.close()

main(gzip=False, outputFile='/tmp/a.txt')
main(gzip=True, outputFile='/tmp/a.txt.gz')

1 Ответ

2 голосов
/ 18 января 2020

Как работает asyncio

Рассмотрим задачу создания двух веб-запросов.

Синхронная версия:

  1. Отправка запроса 1
  2. Ожидание ответа в течение 1 се c.
  3. Отправка запроса 2
  4. Ожидание ответа в течение 1 се c.
  5. Оба запроса завершены в 2 se c.

Асинхронная версия:

  1. Отправить запрос 1
  2. Вместо ожидания немедленно отправьте запрос 2
  3. Ожидать ответы более 1 сек c.
  4. Оба запроса завершены в 1 сек c.

asyncio позволяет писать программу, которая на самом деле работает как во второй асинхронной версии, в то время как ваш код очень похож на (интуитивно понятную) первую версию.

Обратите внимание на важную вещь: единственная причина, по которой асинхронная версия быстрее заключается в том, что он сразу же начинает другую параллельную операцию, а не ждет, когда первая полностью завершена. Он не имеет ничего общего с потоками, asyncio работает в одном основном потоке.


А как насчет дискового ввода-вывода?

Может ли ваше оборудование параллельно считывать / записывать два файла?

Если у вас есть один физический жесткий диск, то, вероятно, нет: у него есть одна физическая «игла» , которая может одновременно считывать / записывать один фрагмент данных. Асинхронный подход вам тогда не поможет.

Ситуация может отличаться, если у вас несколько дисков . Хотя у меня есть идея, может ли OS / asyncio работать параллельно с несколькими дисками (, вероятно, не ).

Предположим, вы ожидаете, что ваше оборудование и ОС будут поддерживать несколько дисковых операций ввода-вывода. О. Вероятно, он будет работать только при использовании нескольких потоков или процессов для операций:

  • Модуль aiofiles использует потоки для работы с файлами - вы можете попробовать
  • Для работы с процессами с ProcessPoolExecutor & asyncio вы можете использовать run_in_executor , как показано здесь

Также есть некоторый шанс, что использование процессов или даже потоков увеличит диск Ввод-вывод исключительно из-за распараллеливания связанных операций с процессором, но я понятия не имею, так ли это и насколько он полезен (вероятно, не сильно по сравнению с дисковым вводом-выводом).

...