Python - вызывающая функция и продолжение основной программы - PullRequest
1 голос
/ 09 июля 2019

Я собираю данные с установленной частотой (например, 8 Гц), эти данные изменяются, сохраняются, а затем время от времени отправляются для записи.

Я сталкиваюсь с проблемой синхронизации из-за потоковой передачи / записиданные.Когда программа записывает данные (каждые 5 секунд), это занимает больше времени, чем 1/8 Гц (0,125 с).Это создает задержку в моем времени для получения данных.

То, что я хочу сделать, это вызвать мою функцию записи и запустить ее, но также позволить моей основной программе продолжать работу, чтобы не было задержки во времени.

Я пытался использоватьнесколько разных методов, но без особой удачи: многопоточность, многопроцессорность и асинхронность.Вполне возможно, что я использую их неправильно.

Очень упрощенная версия того, что я делаю:

    def main():
        while True:
            curTime = datetime.datetime.now()
            while curTime < nextTime:
                continue
            data = collectData() #collect data (serial port, tcp, etc.)
            pdata = processData(data) #process data
            hdata = holdData(hdata) #store data stream for occasional writing

            if len(hdata) > 8*5:
                writeData(hdata) #send data to be written - takes too long and causes delay in next sample > 0.125s from previous.


            nextTime = curTime + datetime.timedelta(microsecond = 125000) #adjust next time for measurement - 0.125s after last time data was collected.

В приведенном выше коде.Я хочу вызвать writeData и заставить эту функцию делать свое дело, но продолжаю движение по своей основной функции и собираю больше данных.writeData может занять столько времени, сколько захочет, при условии, что он быстрее, чем мой интервал записи;в настоящее время это так.

Я использую python3.

Надеюсь, для некоторых указаний этого достаточно.

Любая помощь очень ценится.

1 Ответ

0 голосов
/ 12 июля 2019

Вы на правильном пути, пытаясь решить свою проблему с помощью асинхронного программирования.Асинхронное программирование в Python само по себе сложно, поскольку есть принципиальные различия в параллелизме, реализованном с использованием потоков (threading), процессов (multiprocessing) или сопрограмм (asyncio).Не существует «правильного» подхода, вы выбираете метод, который наилучшим образом соответствует текущему варианту использования.

В вашей задаче есть задачи, связанные с IO (выборка и запись данных), и задачи с CPU (обработка данных), которыеможет работать независимо друг от друга параллельно.Вот как вы можете это сделать.Возможно, это не самое элегантное решение, но оно покажет вам, как вы можете решать такие проблемы.

В нашем решении мы будем использовать потоки для задач, связанных с вводом-выводом, и процессы для задач, связанных с процессором.Лично я предпочел бы использовать потоки для всех задач, но в этом случае мы не сможем раскрыть всю мощь современных многоядерных процессоров для распараллеливания обработки данных из-за GIL .

Сначала давайте импортируем необходимые модули в наш исполняемый скрипт:

import time
import random
import signal
from threading import Thread
from multiprocessing.pool import Pool
from queue import Queue, Empty

Решаемая нами проблема - это проблема производитель-потребитель .Основной поток извлекает данные через фиксированные промежутки времени и помещает их в очередь.Поток процессора получает данные из очереди и передает их в пул рабочих для обработки, затем собирает результаты и помещает их в другую очередь.Эта очередь постоянно читается записывающим потоком, который, наконец, сохраняет данные.Теперь мы добавим несколько констант - количество рабочих процессов, выполняемых параллельно, и интервал выборки данных в секундах:

WORKERS = 4
FETCH_INTERVAL = 1

Ниже приведен основной поток, который отвечает за выборку данных каждые FETCH_INTERVAL сек.бесконечный цикл:

def main():
    raw_data = Queue()
    processor = Thread(target=process, args=(raw_data,))
    processor.start()
    i = 0

    try:

        while True:
            t_fetch = time.time()

            # Simulate the data fetching:
            time.sleep(0.5)
            data = i, random.random()
            print("[main] Fetched raw data:", data)

            raw_data.put(data)
            t_elapsed = time.time() - t_fetch

            if t_elapsed < FETCH_INTERVAL:
                time.sleep(FETCH_INTERVAL - t_elapsed)
            else:
                print("[error] The fetch interval is too short!")

            i = i + 1

    except KeyboardInterrupt:
        print("shutting down...")
    finally:
        raw_data.put(None)
        processor.join()

if __name__ == "__main__":
    main()

Мы начинаем с определения очереди raw_data, в которой будут храниться извлеченные данные, и запуска потока processor, который запускает функцию process, которая принимает raw_data очередь в видеего аргумент.Обратите внимание, что мы не просто спим FETCH_INTERVAL секунд после каждой выборки данных, но мы учитываем задержки, вызванные выборкой данных, поскольку это также задача, связанная с вводом-выводом.Сценарий выполняется до тех пор, пока не будет нажата Ctrl-C.После прерывания мы помещаем None в очередь, чтобы сообщить потокам, что обработка завершена, и ожидаем завершения потока processor.Теперь мы добавляем определение функции process, которая выполняется потоком processor:

def process(raw_data):
    proc_data = Queue()
    writer = Thread(target=write, args=(proc_data,))
    writer.start()

    with Pool(WORKERS, init_worker) as pool:

        while True:
            data_batch = dequeue_data(raw_data, batch_size=WORKERS)

            if not data_batch:
                time.sleep(0.5)
                continue

            results = pool.map(process_data, data_batch)
            print("[processor] Processed raw data:", results)

            for r in results:
                proc_data.put(r)

            if None in data_batch:
                break

    print("joining the writer thread...")
    writer.join()

Здесь мы создаем очередь proc_data, в которой будут храниться результаты обработки данных для writer нить.Поток writer запускает функцию write, которую мы определим позже.После запуска потока writer мы создаем pool из WORKERS процессов.Здесь мы используем функцию init_worker в качестве Pool инициализатора процесса, чтобы игнорировать клавиатурные прерывания в рабочих процессах при их обработке в главном потоке:

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

После создания пула процессов мы входим вбесконечный цикл, постоянно выводящий пакеты данных из очереди raw_data путем вызова функции dequeue_data, которую мы определим ниже.Пакеты данных затем передаются в рабочий пул для обработки.Функция process_data будет определена ниже.Затем мы собираем результаты и помещаем их в очередь proc_data, которая читается потоком writer.Если в пакете данных есть None, обработка прерывается, и мы ожидаем завершения потока writer.Функция dequeue_data определяется следующим образом:

def dequeue_data(data_queue, batch_size):
    items = []

    for _ in range(batch_size):
        try:
            item = data_queue.get(block=False)
        except (KeyboardInterrupt, Empty):
            break

        items.append(item)

    return items

Здесь вы видите, что она просто пытается извлечь и вернуть максимум batch_size точек данных из data_queue.Если данных нет, возвращается пустой список.Функция process_data ничего не делает, но спит 1-5 секунд:

def process_data(data):

    if data is None:
        return

    # Simulate the data processing:
    time.sleep(random.randint(1, 5))

    return data

Наконец, мы определяем функцию write, которая выполняется в потоке writer:

def write(proc_data):

    while True:
        data = proc_data.get()

        if data is None:
            break

        # Simulate the data writing:
        time.sleep(random.randint(1, 2))
        print("[writer] Wrote processed data:", data)

Бесконечный цикл останавливается, как только он получает None из очереди proc_data.Теперь мы сохраняем весь предоставленный код в одном скрипте, затем запускаем и проверяем его результаты:

[main] Fetched raw data: (0, 0.8092310624924178)
[main] Fetched raw data: (1, 0.8594148294409398)
[main] Fetched raw data: (2, 0.9059856675215566)
[main] Fetched raw data: (3, 0.5653361157057876)
[main] Fetched raw data: (4, 0.8966396309003691)
[main] Fetched raw data: (5, 0.5772344067614918)
[processor] Processed raw data: [(0, 0.8092310624924178)]
[main] Fetched raw data: (6, 0.4614411399877961)
^Cshutting down...
[writer] Wrote processed data: (0, 0.8092310624924178)
[processor] Processed raw data: [(1, 0.8594148294409398), (2, 0.9059856675215566), (3, 0.5653361157057876), (4, 0.8966396309003691)]
[writer] Wrote processed data: (1, 0.8594148294409398)
[writer] Wrote processed data: (2, 0.9059856675215566)
[processor] Processed raw data: [(5, 0.5772344067614918), (6, 0.4614411399877961), None]
joining the writer thread...
[writer] Wrote processed data: (3, 0.5653361157057876)
[writer] Wrote processed data: (4, 0.8966396309003691)
[writer] Wrote processed data: (5, 0.5772344067614918)
[writer] Wrote processed data: (6, 0.4614411399877961)

Поток main извлекает данные через фиксированные промежутки времени, в то время как processor обрабатывает данные в пакетах параллельно, а writer сохраняет результаты. Как только мы нажали Ctrl-C, поток main прекратил извлечение данных, затем поток processor завершил обработку остальных извлеченных данных и начал ждать, пока поток writer завершит запись данных на диск.

...