PiZero W подключен к двум периферийным устройствам (GPIO и USB): как непрерывно считывать данные с обоих одновременно? - PullRequest
2 голосов
/ 21 октября 2019

У меня есть Raspberry Pizero W, который подключен через контакты GPIO к расходомеру, а USB - к сканеру штрих-кода. У меня есть сценарий Python, который использует функцию обратного вызова для оповещения при обнаружении ввода GPIO. Этот сценарий python должен постоянно работать на pizero, чтобы распознавать, когда расходомер активирован, и обрабатывать ввод.

Проблема в том, что у меня также есть сканер штрих-кода, подключенный через USB к pizero. Я бы хотел, чтобы пиццерия также распознавала, когда штрих-код сканируется, и обрабатывает этот ввод.

Затем пиццер должен отправить сообщение, которое включает в себя как информацию от расходомера, так и информацию от сканера штрих-кода.

Есть ли способ сделать это в том же скрипте Python? Как можно одновременно прослушивать и обрабатывать pizero с двух входов? Будет ли легче реализовать это разделение на два разных сценария, и если да, могу ли я запустить их одновременно и каким-то образом объединить информацию, которую они предоставляют в 3-м непрерывно работающем сценарии?

Спасибо!

Некоторые пояснения к комментариям (спасибо за ввод):

  • входной вывод от расходомера GPIO 17, который является соединением SPI
  • также имеет напряжение 5 Ви заземляющий контакт подключен.

Сценарий должен быть запущен при запуске системы. Я посмотрю на systemctl, поскольку я не слышал об этом, пока он не был упомянут.

Pi обычно распознает сканируемый штрих-код как ввод с клавиатуры (то есть последовательность цифр, за которой следует символ новой строки), когдарасходомер не подключен.

Когда я отправляю сообщение, содержащее информацию о расходомере и штрих-коде, мне нужно отправить объект JSON из python, который включает в себя как фрагменты информации, так и отметку времени, когда информация была получена.

Этот объект JSON будет отправлен по Wi-Fi на сервер raspberry pi со статическим ip в той же домашней сети, что и pizero. Сервер raspberry pi имеет доступ к базе данных Django, которая должна включать информацию об объекте JSON в базу данных.

Ответы [ 2 ]

1 голос
/ 23 октября 2019

Другим, возможно, более простым вариантом может быть использование Redis . Redis - это высокопроизводительный сервер со структурой данных в памяти. Его легко установить на Raspberry Pi, Mac, Linux Windows или другой компьютер. Он позволяет вам обмениваться атомарными целыми числами, строками, списками, хэшами, очередями, наборами и упорядоченными наборами между любым количеством клиентов в сети.

Таким образом, концепция может заключаться в том, чтобы иметь отдельную программу, контролирующую расходомер и заполнениетекущее чтение в Redis так часто, как вам нравится. Затем другая отдельная программа, считывающая штрих-коды и вставляющая их в Redis так часто, как вам нравится. И, наконец, у вас есть управляющая программа, возможно, где-то еще в вашей сети, которая может захватывать оба значения так часто, как ей хочется.

Обратите внимание, что вы можете запустить сервер Redis на вашем Raspberry Piили любая другая машина.

Итак, вот программа расходомера - просто измените host на IP-адрес машины, на которой работает Redis:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis
r = redis.Redis(host,port)

reading = 0
while True:
   # Generate synthetic reading that just increases every 500ms
   reading +=1 
   # Stuff reading into Redis as "fmReading"
   r.set("fmReading",reading)
   time.sleep(0.5)

Вот программа считывания штрих-кода:

#!/usr/bin/env python3

import redis
import time
from random import random, seed

host='localhost'
port=6379

# Connect to local Redis server
r = redis.Redis(host,port)

# Generate repeatable random numbers
seed(42)

while True:
   # Synthesize barcode and change every 2 seconds
   barcode = "BC" + str(int((random()*1000)))
   # Stuff barcode into Redis as "barcode"
   r.set("barcode",barcode)
   time.sleep(2)

А вот главная управляющая программа:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis server
r = redis.Redis(host,port)

while True:
   # Grab latest flowmeter reading and barcode
   fmReading = r.get("fmReading")
   barcode   = r.get("barcode")
   print(f"Main: fmReading={fmReading}, barcode={barcode}")
   time.sleep(1)

Пример вывода

Main: fmReading=b'10', barcode=b'BC676'
Main: fmReading=b'12', barcode=b'BC892'
Main: fmReading=b'14', barcode=b'BC892'
Main: fmReading=b'16', barcode=b'BC86'
Main: fmReading=b'18', barcode=b'BC86'
Main: fmReading=b'20', barcode=b'BC421'
Main: fmReading=b'22', barcode=b'BC421'
Main: fmReading=b'24', barcode=b'BC29'

Обратите внимание, что для отладки этого,Вы также можете получить любые показания с помощью интерфейса командной строки для Redis с любого компьютера в вашей сети, например, в терминале:

redis-cli get barcode
"BC775"

Если вы хотите отобразить значения в Интернете-браузер, написанный на PHP, вы можете использовать привязки PHP к Redis, чтобы получать значения тоже - очень удобно!

Конечно, вы можете настроить программу так, чтобы отправлять временные метки показаний с каждым - возможно,используя Redis хэш , а не простой ключ. Вы также можете реализовать очередь для отправки сообщений между программами, используя Redis LPUSH и BRPOP .

Ключевые слова : Redis, список, очередь, хэш, Raspberry Pi

1 голос
/ 22 октября 2019

Обновленный ответ

Я добавил код для считывателя штрих-кодов. Я сделал так, чтобы считыватель штрих-кода занимал различное количество времени, до 5 секунд, чтобы выполнить считывание, а расходомер - 0,5 секунды, чтобы вы могли видеть, что разные потоки развиваются с разной скоростью независимо друг от друга.

#!/usr/bin/env python3

from threading import Lock
import threading
import time
from random import seed
from random import random

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    # Take 0.5s to read
    time.sleep(0.5)
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

# Dummy function to read Barcode as I don't have anything attached
def readBarcode():
    # Take variable time, 0..5 seconds, to read
    time.sleep(random()*5)
    result = "BC" + str(int(random()*1000))
    return result

class BarcodeReader(threading.Thread):
    def __init__(self):
        super(BarcodeReader, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read barcode and safely update self.currentReading
        while True:
            value = readBarcode()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Generate repeatable random numbers
    seed(42)

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.daemon = True
    fmThread.start()

    # Instantiate and start barcode reader thread
    bcThread = BarcodeReader()
    bcThread.daemon = True
    bcThread.start()

    # Now you can do other things in main, but always get access to latest readings
    for i in range(20):
        fmReading = fmThread.read()
        bcReading = bcThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}, Barcode={bcReading}")
        time.sleep(1)

Пример вывода

Main: i = 0 FlowMeter reading = 0, Barcode=0
Main: i = 1 FlowMeter reading = 1, Barcode=0
Main: i = 2 FlowMeter reading = 3, Barcode=0
Main: i = 3 FlowMeter reading = 5, Barcode=0
Main: i = 4 FlowMeter reading = 7, Barcode=BC25
Main: i = 5 FlowMeter reading = 9, Barcode=BC223
Main: i = 6 FlowMeter reading = 11, Barcode=BC223
Main: i = 7 FlowMeter reading = 13, Barcode=BC223
Main: i = 8 FlowMeter reading = 15, Barcode=BC223
Main: i = 9 FlowMeter reading = 17, Barcode=BC676
Main: i = 10 FlowMeter reading = 19, Barcode=BC676
Main: i = 11 FlowMeter reading = 21, Barcode=BC676
Main: i = 12 FlowMeter reading = 23, Barcode=BC676
Main: i = 13 FlowMeter reading = 25, Barcode=BC86
Main: i = 14 FlowMeter reading = 27, Barcode=BC86
Main: i = 15 FlowMeter reading = 29, Barcode=BC29
Main: i = 16 FlowMeter reading = 31, Barcode=BC505
Main: i = 17 FlowMeter reading = 33, Barcode=BC198
Main: i = 18 FlowMeter reading = 35, Barcode=BC198
Main: i = 19 FlowMeter reading = 37, Barcode=BC198

Оригинальный ответ

Я бы посоветовал вам взглянуть на systemd и systemctl чтобы ваше приложение запускалось при каждом запуске системы - пример здесь .

Что касается мониторинга двух вещей одновременно, я бы предложил вам использовать модуль Python threading . Вот краткий пример, я создаю объект подкласса из threading, который управляет вашим расходомером, постоянно читая его и сохраняя текущее значение в переменной, которую основная программа может прочитать в любое время. Вы можете запустить другой аналогичный, который управляет вашим считывателем штрих-кода и запустить их параллельно. Я не хотел этого делать и запутал вас двойным кодом.

#!/usr/bin/env python3

from threading import Lock
import threading
import time

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()
            time.sleep(0.01)

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.start()

    # Now you can do other things in main, but always get access to latest reading
    for i in range(100000):
        fmReading = fmThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}")
        time.sleep(1)

Вы можете посмотреть, как использовать logging для координации и унификации ваших сообщений отладки и регистрации - см. здесь .

Вы можете посмотреть на events, чтобы другие потоки знали, что что-то нужно делать, когда что-то достигает критического уровня - пример здесь .

...