Таймеры не могут быть остановлены из другого потока (короткий пример с nidaqmx- python и обратными вызовами) - PullRequest
2 голосов
/ 03 апреля 2020

Я видел другие вопросы по теме c на этом форуме, но ни один из них не помог мне понять, как с этим справиться. Мне кажется, что большинство из них - довольно сложный и длинный код. Я считаю, что я делаю что-то довольно простое / хотел бы сделать что-то довольно простое. Я надеюсь, что кто-то может помочь! Здесь ниже обширные объяснения, а затем мой текущий код.

ПРИМЕЧАНИЕ: пожалуйста, не удаляйте этот вопрос. Я много думал об этом и тщательно изучил связанные темы, но безрезультатно. Я также считаю, что имеет смысл публиковать это, потому что это частично связано с более общим вопросом c: вопросом о том, как строить графики в режиме реального времени при наличии обратных вызовов, работающих в фоновом режиме (см. Резюме в конце), что может быть Подводя итог моей общей цели.

Настройка и цель: Модуль сбора данных National Instruments (это важно) NI cDAQ9178 с интерфейсом через nidaqmx-python, пакет, поддерживаемый NI с документацией здесь . Там вводится некоторый аналоговый сигнал, и цель состоит в том, чтобы получать его непрерывно (до тех пор, пока я не решу прекратить получение) с определенной частотой дискретизации (приблизительно 1000 Гц) при построении графика сигнала в режиме реального времени. Графики не нужно обновлять почти так часто (частота рефреза 10 Гц sh будет даже хорошей). Я использую Windows 10 с Python 3.7 в виртуальной среде conda, и редактирование выполняется в PyCharm. В идеале все должно работать как в PyCharm, так и в любом терминале.

Ситуация: nidaqmx-python предоставляет функции высокого уровня, которые позволяют регистрировать обратные вызовы (которые определяются как пожелания), которые вызываются каждый раз при определенном количестве выборок (в моем случае 100, но это не строго) заполняет буфер P C. Идея состоит в том, что обратный вызов, определенный ниже, читает буфер в этой точке и что-то делает (в моем случае некоторая фильтрация нижних частот, которую я взял для краткости, некоторая запись в глобальную переменную data и, возможно, заговор - см. ниже).

Проблема: Я дурачился с тем, чтобы в обратный вызов включались все графики данных в реальном времени, но с matplotlib это кошмар, потому что обратный вызов использует потоки, отличные от основного. один, и Matplotlib не любит вызываться из-за пределов основного потока. Я погуглил чертовски другие библиотеки, оптимизированные для построения графиков в реальном времени (и, я думал, я надеюсь, что потокобезопасен), но это не так просто: я не могу заставить работать vispy, и я не могу заставить pyqtgraph даже установить, просто чтобы приведу несколько примеров. Затем я увидел несколько сообщений от 10 * 10 людей, которые фактически управляют довольно приличными анимациями в реальном времени с помощью matplotlib, несмотря на то, что он был разработан с учетом публикации, а не этих приложений; поэтому я подумал, давайте дадим ему go.

Мое мнение: Поскольку я не мог заставить matplotlib выполнять работу из-за обратного вызова, я сделал следующее (код, который вы видите ниже): после обратного вызова и после выполнения задачи запускается с task.start() (то есть с c по nidaqmx-python), я просто создаю while l oop, который отображает глобальную переменную buffer. Я подумал, что это хороший трюк: видите, buffer обновляется (назовите это) обратным вызовом каждые 0,1 секунды или около того (не имеет значения), а сбоку while l oop строит график buffer переменная снова и снова, стирая каждый раз перед построением графика, эффективно создавая график в реальном времени.

ПРИМЕЧАНИЕ. Я прекрасно знаю, что часть прорисовки не так хороша, как могла бы быть (вероятно, мне следует использовать API-интерфейс ax для matplotlib и subplots, не говоря уже о анимации), но я делаю не волнует в данный момент. Я займусь этим позже и уточню, чтобы сделать его более эффективным.

Что я хочу: это на самом деле делает то, что я хочу ... кроме того, чтобы остановить это, я ввел операторы try: и except: вокруг while l oop, как вы видите в коде ниже. Естественно, нажатие CTRL+C нарушает l oop ... но затем он также ломает весь запущенный скрипт и оставляет мне следующую ошибку: forrtl: error (200): program aborting due to control-C event в PyCharm и следующую точность при запуске из терминала:

Image              PC                Routine            Line        Source
libifcoremd.dll    00007FFECF413B58  Unknown               Unknown  Unknown
KERNELBASE.dll     00007FFF219F60A3  Unknown               Unknown  Unknown
KERNEL32.DLL       00007FFF23847BD4  Unknown               Unknown  Unknown
ntdll.dll          00007FFF240CCED1  Unknown               Unknown  Unknown
QObject::~QObject: Timers cannot be stopped from another thread

Неудобство в том, что у меня тогда нет другого выбора, кроме как закрыть оболочку python (подумав снова PyCharm), и у меня нет доступа к моей драгоценной переменной data, содержащей ... ну, мои данные.

Угадайте: очевидно, что обратный вызов не любит, когда его останавливают в этой фазе. Задача nidaqmx_python должна быть остановлена ​​с помощью task.stop(). Я пытаюсь поставить task.stop() сразу после KeyboardInterrupt except:, но это не помогает, так как CTRL+C останавливает скрипт сверху / вместо прерывания l oop. Я считаю, что требуется более изощренный метод остановки моей задачи. Я думал об этом в течение нескольких дней, но не могу придумать, как получить обе вещи: задачу, которую я могу остановить, и в то же время в режиме реального времени. Обратите внимание, что без построения графика легко остановить задачу при ENTER нажатии клавиши: один просто пишет в конце

input('Press ENTER to stop task')
task.stop()

Но, конечно, простое выполнение вышеописанного не позволяет мне включить реальное сюжетная часть.

Резюме: Я не мог вызвать matplotlib из функции обратного вызова, которая непрерывно считывает данные, поэтому я написал while l oop для построения графика в реальном времени в отдельном блоке, но тогда я не вижу способа остановить это while l oop без получения вышеуказанной ошибки (которая жалуется, что обратный вызов был остановлен из другого потока, я думаю).

Надеюсь, я проясняюсь, и если нет, пожалуйста, спросите!

Код: Я очистил его, чтобы приблизить его как можно ближе к MWE, который показывает проблему, хотя, конечно, я понимаю, что у большинства из вас нет NI daq, чтобы поиграться и подключиться, чтобы иметь возможность выполнить это. Во всяком случае ... вот оно:

import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants

sfreq = 1000
bufsize = 100

with nidaqmx.Task() as task:

    # Here we set up the task ... nevermind
    task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
    task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                    samps_per_chan=bufsize)
    # Here we define a stream to be read continuously
    stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

    data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
    buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback

    # This is my callback to read data continuously
    def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize is passed to num_samples when this is called
        global data
        global buffer

        buffer = np.zeros((1, num_samples))

        # This is the reading part
        stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
        data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

        return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

    # Here is the heavy lifting I believe: the above callback is registered
    task.register_every_n_samples_acquired_into_buffer_event(bufsize, reading_task_callback)
    task.start()  # The task is started (callback called periodically)

    print('Acquiring sensor data. Press CTRL+C to stop the run.\n')  # This should work ...

    fig = plt.figure()
    try:
        while True:
            # Poor's man plot updating
            plt.clf()
            plt.plot(buffer.T)
            plt.show()
            plt.pause(0.01)  # 100 Hz refresh rate
    except KeyboardInterrupt:  # stop loop with CTRL+C ... or so I thought :-(
        plt.close(fig)
        pass

    task.stop()  # I believe I never get to this part after pressing CTRL+C ...

    # Some prints at the end ... nevermind
    print('Total number of acquired samples: ', len(data.T),'\n')
    print('Sampling frequency: ', sfreq, 'Hz\n')
    print('Buffer size: ', bufsize, '\n')
    print('Acquisition duration: ', len(data.T)/sfreq, 's\n')

Любой вклад будет оценен. Заранее спасибо, ребята!

РЕДАКТИРОВАТЬ: после принятого здесь ответа ниже, я переписал приведенный выше код и придумал следующее, которое теперь работает как задумано (извините, на этот раз я не убрал его, и некоторые строки не имеют отношения к данному вопросу):

# Stream read from a task that is set up to read continuously
import matplotlib.pyplot as plt
import numpy as np

import nidaqmx
from nidaqmx import stream_readers
from nidaqmx import constants

from scipy import signal

import threading

running = True

sfreq = 1000
bufsize = 100
bufsizeb = 100

global task

def askUser():  # it might be better to put this outside of task
    global running
    input("Press return to stop.")
    running = False

def main():
    global running

    global data
    global buffer
    global data_filt
    global buffer_filt

    global b
    global z

    print('Acquiring sensor data...')

    with nidaqmx.Task() as task:  # maybe we can use target as above

        thread = threading.Thread(target=askUser)
        thread.start()

        task.ai_channels.add_ai_voltage_chan("cDAQ2Mod1/ai1")
        task.timing.cfg_samp_clk_timing(rate=sfreq, sample_mode=constants.AcquisitionType.CONTINUOUS,
                                        samps_per_chan=bufsize)
        # unclear samps_per_chan is needed here above or why it would be different than bufsize
        stream = stream_readers.AnalogMultiChannelReader(task.in_stream)

        data = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
        buffer = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback
        data_filt = np.zeros((1, 0))  # probably not the most elegant way of initializing an empty numpy array
        buffer_filt = np.zeros((1, bufsizeb))  # defined so that global buffer can be written in the callback

        b = signal.firwin(150, 0.004)
        z = signal.lfilter_zi(b, 1)

        def reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsizeb is passed to num_samples
            global data
            global buffer
            global data_filt
            global buffer_filt
            global z
            global b

            if running:
                # It may be wiser to read slightly more than num_samples here, to make sure one does not miss any sample,
                # see: https://documentation.help/NI-DAQmx-Key-Concepts/contCAcqGen.html
                buffer = np.zeros((1, num_samples))
                stream.read_many_sample(buffer, num_samples, timeout=constants.WAIT_INFINITELY)
                data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

                # IIR Filtering, low-pass
                buffer_filt = np.zeros((1, num_samples))
                for i, x in enumerate(np.squeeze(buffer)):  # squeeze required for x to be just a scalar (which lfilter likes)
                    buffer_filt[0,i], z = signal.lfilter(b, 1, [x], zi=z)

                data_filt = np.append(data_filt, buffer_filt, axis=1)  # appends buffered filtered data to variable data_filt

            return 0  # Absolutely needed for this callback to be well defined (see nidaqmx doc).

        task.register_every_n_samples_acquired_into_buffer_event(bufsizeb, reading_task_callback)  # bufsizeb instead

        task.start()
        while running:  # this is perfect: it "stops" the console just like sleep in a way that the task does not stop
            plt.clf()
            plt.plot(buffer.T)
            plt.draw()
            plt.pause(0.01)  # 100 Hz refresh rate
        # plt.close(fig)  # maybe no need to close it for now

        # task.join()  # this is for threads I guess ... (seems useless to my case?)

        # Some prints at the end ...
    print('Total number of acquired samples:', len(data.T))
    print('Sampling frequency:', sfreq, 'Hz')
    print('Buffer size:', bufsize)
    print('Acquisition duration:', len(data.T)/sfreq, 's')

if __name__ == '__main__':
    main()

Обратите внимание, что мне не нужен task.stop(), потому что способ непрерывных задач сбора данных с этим пакетом заключается в том, что чтение любой строки кода после task.start(), который не является sleep или чем-то подобным, останавливает задачу (по крайней мере, это мое понимание).

1 Ответ

1 голос
/ 03 апреля 2020

Первым делом я избавился от прерывания клавиатуры l oop. Я заменил его глобальной переменной running и другим потоком, который устанавливает значение переменной False при возврате из.

def askUser():
  global running
  input("Press return to stop.")
  running = False

Затем, перед while loop, создал новый поток, который будет выполняться эта функция.

askUserThread = threading.Thread(target=askUser)
askUserThread.start()

И пока l oop, избавление от оператора try catch:

while running:
  plt.clf()
  plt.plot(buffer.T)
  plt.draw()          # Note: this got changed because .show wasn't working.
  plt.pause(0.01)

Это все еще не сработало для меня потому что мне пришлось закрыть окно сюжета, чтобы появилось новое. Таким образом, с этого ответа я изменил его с .show на .draw.

Мой конечный код был немного другим (поскольку я выбрал случайные данные), но здесь он есть.

# sampling.py
# by Preston Hager

import matplotlib.pyplot as plt
import numpy as np

import threading

sfreq = 1000
bufsize = 100

running = True

data = np.zeros((1, 0))  # initializing an empty numpy array for my total data
buffer = np.zeros((1, bufsize))  # defined so that global buffer can be written to by the callback

def askUser():
    global running

    input("Press return to stop.")
    running = False

def readingTask():
    global data
    global buffer

    while running:
        buffer = np.random.rand(1, bufsize)
        # This is the reading part
        data = np.append(data, buffer, axis=1)  # appends buffered data to variable data

def main():
    global running

    print('Acquiring sensor data.')

    thread = threading.Thread(target=askUser)
    thread.start()
    task = threading.Thread(target=readingTask)
    task.start()

    fig = plt.figure()
    while running:
        # Poor's man plot updating
        plt.clf()
        plt.plot(buffer.T)
        plt.draw()
        plt.pause(0.01)  # 100 Hz refresh rate
    plt.close(fig)

    task.join()

    # Some prints at the end ... nevermind
    print('Total number of acquired samples:', len(data.T))
    print('Sampling frequency:', sfreq, 'Hz')
    print('Buffer size:', bufsize)
    print('Acquisition duration:', len(data.T)/sfreq, 's')

if __name__ == '__main__':
    main()
...