Запись данных на dis c в отдельном потоке (параллельно) - PullRequest
2 голосов
/ 29 мая 2020

Я хотел бы запускать функцию несколько раз в al oop, которая каждый раз получает изображение с камеры и записывает изображение на DIS c без ожидания l oop завершения этого процесса sh . Таким образом, каждый раз, когда эта функция вызывается, она запускается параллельно с l oop, запустившим функцию, так что я могу тем временем продолжать делать другие чувствительные ко времени вещи.

Я сделал этот пример, в котором первое «выполнение» функции выполняется параллельно с l oop, а затем происходит сбой во второй раз, поскольку я не могу выполнить .start () дважды. Можно ли этого достичь другими способами?

Пример (исходное сообщение - обновлено ниже)

import numpy as np
import threading
import time

def imacq():
    print('acquiring image...')
    time.sleep(1.8)
    print('saved image...')
    return

# Start image acqusition and writing to disc thread
imacq_thread = threading.Thread(target=imacq)

starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True
for cycles in range(1,20):
    print(cycles)
    if cycles%image_cycles == 0:
        if flag is True:
            imacq_thread.start() # this works well the first time as intended
            # imacq() # this does not work as everything is paused until imacp() returns
            flag = False
    else:
        flag = True
    time.sleep(0.4)

РЕДАКТИРОВАТЬ: После обратной связи от Сильвауса: Я сделал две разные версии для запуска функции, которая в конечном итоге будет использоваться для получения и сохранения изображения на диске параллельно с основным скриптом, который определяет время для отправки триггера / выполнения функции. Одна версия основана на ответе Сильвауса (многопоточность), а другая - на многопроцессорности.

Пример, основанный на ответе Сильвауса (Threading):

import matplotlib.pyplot as plt
import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor


def imacq():
    print('taking image')
    n = 10000
    np.ones((n, n))*np.ones((n, n))  # calculations taking time
    print('saving image')
    return


sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 20
max_cycles = 100
freq = 10
cycles = 1
sigSign = 1

running = True
flag = True
timeinc = []
tic = time.time()
tic2 = tic
timeinc = np.zeros(max_cycles)
starttime = time.time()
with ThreadPoolExecutor() as executor:
    while running:
        t = time.time()-starttime
        tim_arr[:-1] = tim_arr[1:]
        tim_arr[-1] = t
        signal = np.sin(freq*t*(2.0*np.pi))
        sig_arr[:-1] = sig_arr[1:]
        sig_arr[-1] = signal

        time.sleep(0.00001)
        # Calculate cycle number
        sigSignOld = sigSign
        sigSign = np.sign(sig_arr[-1]-sig_arr[-2])
        if sigSign == 1 and sigSignOld != sigSign:
            timeinc[cycles] = time.time()-tic
            cycles += 1
            print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
            tic = time.time()

        if cycles%image_cycles == 0:
            if flag is True:
                # The function is submitted and will be processed by a
                # a thread as soon as one is available
                executor.submit(imacq)
                flag = False
        else:
            flag = True
        if cycles >= max_cycles:
            running = False

print('total time: ', time.time()-tic2)

fig = plt.figure()
ax = plt.axes()
plt.plot(timeinc)

Пример, основанный на многопроцессорности:

import matplotlib.pyplot as plt
import numpy as np
import time
from multiprocessing import Process, Value, Lock


def trig_resp(running, trigger, p_count, pt, lock):
    while running.value == 1:  # note ".value" on each sharedctype variable
        time.sleep(0.0001)  # sleeping in order not to load CPU too excessively
        if trigger.value == 1:
            with lock:  # lock "global" variable before wrtting to it
                trigger.value = 0  # reset trigger
            tic = time.time()
            # Do a calculation that takes a significant time
            n = 10000; np.ones((n, n))*np.ones((n, n))
            with lock:
                pt.value = time.time() - tic  # calculate process time
                p_count.value += 1  # count number of finished processes
    return


if __name__ == "__main__":
    # initialize shared values (global accross processes/sharedctype).
    # Type 'i': integer, type 'd': double.
    trigger = Value('i', 0)  # used to trigger execution placed in trig_resp()
    running = Value('i', 1)  # A way to break the loop in trig_resp()
    p_count = Value('i', 0)  # process counter and flag that process is done
    pt = Value('d', 0.0)  # process time of latest finished process
    lock = Lock() # lock object used to avoid raise conditions when changing "global" values.
    p_count_old = p_count.value
    p1 = Process(target=trig_resp, args=(running, trigger, p_count, pt, lock))
    p1.start()  # Start process

    # A "simulated" sinusiodal signal
    array_len = 50
    sig_arr = np.zeros(array_len)  # Signal array
    tim_arr = np.zeros(array_len)  # Correpsonding time array
    freq = 10  # frequency of signal

    # trigger settings
    im_int = 20  # cycle interval for triggering (acquiring images)
    max_cycles = 100  # max number of cycles before stopping main

    # initializing counters etc.
    cycles = 1  # number of cycles counted
    sigSign = 1  # sign of signal gradient
    flag = 1  # used to only set trigger once for the current cycle count
    trigger_count = 0  # counts how many times a trigger has been set

    tic = time.time()
    tic2 = tic
    timeinc = np.zeros(max_cycles) # Array to keep track of time used for each main loop run
    starttime = time.time()
    while running.value == 1:
        time.sleep(0.00001)  # mimics sample time (real world signal)
        t = time.time()-starttime  # local time
        signal = np.sin(freq*t*(2.0*np.pi))  # simulated signal
        # Keeping the latest array_len values (FIFO) of t and signal.
        tim_arr[:-1] = tim_arr[1:]
        tim_arr[-1] = t
        sig_arr[:-1] = sig_arr[1:]
        sig_arr[-1] = signal

        if p_count.value == p_count_old + 1:  # process have finished
            print('Process counter: ', p_count.value,  'process_time: ', pt.value)
            p_count_old = p_count.value

        # Calculate cycle number by monotoring sign of the gradient
        sigSignOld = sigSign  # Keeping track of previous signal gradient sign
        sigSign = np.sign(sig_arr[-1]-sig_arr[-2])  # current gradient sign
        if sigSign == 1 and sigSignOld == -1:  # a local minimum just happened
            timeinc[cycles] = time.time()-tic
            cycles += 1
            print('cycles: ', cycles, ' time inc.: ', str(timeinc[cycles-1]))
            tic = time.time()
            flag = 1

        if cycles % im_int == 0 and flag == 1:
            if cycles > 0:
                if trigger_count > p_count.value:
                    print('WARNING: Process: ', p_count.value,
                          'did not finish yet. Reduce freq or increase im_int')
                trigger.value = 1
                trigger_count += 1
                print('Trigger number: ', trigger_count)
                flag = 0

        if cycles >= max_cycles:
            running.value = 0

    print('total cycle time: ', time.time()-tic2)

    # Print the process time of the last run
    if p_count.value < max_cycles//im_int:
        if p_count.value == p_count_old + 1:
            print('process counter: ', p_count.value,  'process_time: ', pt.value)
            p_count_old = p_count.value

    print('total process time: ', time.time()-tic2)

    fig = plt.figure()
    ax = plt.axes()
    plt.plot(timeinc)

Я использую ноутбук windows 10, поэтому время (приращение времени в каждом l oop основного, а l oop «во время работы ...:») зависит от того, что еще происходит на моем компьютере, но версия, основанная на многопроцессорности, кажется менее чувствительной к этому, чем один основан на многопоточности. Однако метод, основанный на многопроцессорности, не очень элегантен, и я подозреваю, что возможно более разумное решение (более простое и менее легкое для ошибки), которое может достичь того же или лучшего (постоянное приращение времени с меньшей нагрузкой на ЦП).

Я приложил графики временных приращений, которые я получил здесь для примеров многопроцессорности и потоковой передачи, соответственно здесь: Multiprocess example Threading example

Любые отзывы об улучшении двух решений приветствуются.

Ответы [ 2 ]

0 голосов
/ 06 июня 2020

Детали ваших устройств сбора данных, скорости передачи данных и объемов кажутся не очень ясными, но у меня сложилось впечатление, что проблема в том, что вы хотите получить один сигнал как можно быстрее и хотите получить снимок и записывается на диск как можно скорее, когда этот сигнал "интересен" , но без задержки следующего получения сигнала.

Таким образом, кажется, что между основным сигналом необходим минимальный обмен данными процесс получения и процесс захвата изображения. IMHO, это предполагает многопроцессорность (следовательно, без GIL) и использование очереди (без больших объемов данных для обработки) для связи между двумя процессами.

Итак, я бы посмотрел на этот тип настройки:

#!/usr/bin/env python3

from multiprocessing import Process, Queue, freeze_support

def ImageCapture(queue):
    while True:
        # Wait till told to capture image - message could contain event reference number
        item = queue.get()
        if item == -1:
           break
        # Capture image and save to disk

def main():
    # Create queue to send image capture requests on
    queue = Queue(8)

    # Start image acquisition process
    p = Process(target=ImageCapture, args=(queue,))
    p.start()

    # do forever
    #    acquire from DAQ
    #    if interesting
    #       queue.put(event reference number or filename)

    # Stop image acquisition process
    queue.put(-1)
    p.join()

if __name__ == "__main__":

    # Some Windows thing
    freeze_support()
    main()

Если процесс ImageCapture() не успевает, запустите два или более.

На моем Ma c я измерил среднее время доставки сообщения в очереди 32 микросекунд и максимальная задержка 120 микросекунд для 1 миллиона сообщений.

0 голосов
/ 29 мая 2020

Вы можете использовать Executor . Таким образом, вы можете просто отправить свои задачи, и они будут обрабатываться в зависимости от типа Executor, который вы используете.

Я не знаю что находится в вашем imacq, поэтому вам, возможно, придется попробовать ThreadPoolExecutor и ProcessPoolExecutor, чтобы найти, какой из них наиболее подходит для вашего приложения.

Пример:

import numpy as np
import time
from concurrent.futures import ThreadPoolExecutor

def imacq():
    print('acquiring image...')
    time.sleep(1.8)
    print('saved image...')
    return

starttime = time.time()
sig_arr = np.zeros(100)
tim_arr = np.zeros(100)
image_cycles = 5
running = True
flag = True

with ThreadPoolExecutor() as executor:
    for cycles in range(1,20):
        print(cycles)
        if cycles%image_cycles == 0:
            if flag is True:
                # The function is submitted and will be processed by a 
                # a thread as soon as one is available
                executor.submit(imacq)
                flag = False
        else:
            flag = True
        time.sleep(0.4)
...