Обмен данными между процессами Python через базу данных - PullRequest
0 голосов
/ 23 февраля 2019

У меня есть малиновый пи с MCC118 daqhat .Цель состоит из трех частей:

  1. Получение и запись данных в базу данных
  2. Потоковая передача данных через частную локальную сеть в браузер как можно ближе к реальному времени (скорость сети GET / POST)ограничено)
  3. Анализировать эти данные и активировать устройства (RPi.GPIO) на основе анализа

Моя программа привязана как к процессору, так и к вводу / выводу, так как программа должна ожидать данныеввод от daqhat (связанный с вводом / выводом), и, как только данные прочитаны, они должны быть записаны и проанализированы (связанные с процессором).Для части, связанной с процессором, я использую многопроцессорность для создания базы данных.Дисплей браузера будет обновлен (через колбу) путем запроса базы данных.Чтобы сделать это эффективно, я разработал следующий пример кода в качестве основы для записи и отображения аспекта моего проекта:

#!/usr/bin/env python
#  -*- coding: utf-8 -*-

import sys
import random
import psycopg2 as sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from multiprocessing import Process

def fill_table():
    conn_fill = sql.connect('dbname=mp_example user=pi')
    cf = conn_fill.cursor()
    while True:
        cf.execute('INSERT INTO ran_num (ran) VALUES (%s);', (random.random(),))
        conn_fill.commit()

def read_table():
    conn_read = sql.connect('dbname=mp_example user=pi')
    cr = conn_read.cursor()
    while True:
        cr.execute('SELECT * FROM ran_num WHERE id=(SELECT MAX(id) FROM ran_num);')
        val = cr.fetchone()
        if val:
            print('\r' + str(val[0]) + ' - ' + str(round(float(val[1]), 3)), end='')
        sys.stdout.flush()
    print('\n')

def main():
    conn_main = sql.connect('dbname=mp_example user=pi')
    cm = conn_main.cursor()
    cm.execute('DROP TABLE IF EXISTS ran_num;')
    cm.execute('CREATE TABLE ran_num (id SERIAL PRIMARY KEY, ran VARCHAR);')
    conn_main.commit()
    cm.close()
    conn_main.close()
    print('Starting program, press CTRL + C to quit... \n')

if __name__ == '__main__':
    main()
    try:
        p1 = Process(target=fill_table)
        p1.start()
        p2 = Process(target=read_table)
        p2.start()
    except KeyboardInterrupt:
        p1.join()
        p2.join()
        print('Processes have closed without zombies...\n')

Я только что познакомился с Postgres (я использовал sqlite3) впопытка получить параллелизм в написании и запросе данных, что, на мой взгляд, является «безопасным для данных» способом выполнения действий.Однако у меня следующие вопросы:

  1. Есть ли более эффективный для памяти и / или безопасный для данных способ сделать это (например, каналы)?
  2. Есть ли лучший способ выйтикогда выдается исключение KeyboardInterrupt?(Мой оператор печати исключений никогда не печатает, указывая, что процессы тоже покидают зомби процессов.)

ПРИМЕЧАНИЕ. Сначала я просто считывал одно значение данных из daqhat каждый раз, когда поступал запрос GET.LAN, однако, это означало, что частота дискретизации зависела от скорости сети.Для обеспечения целостности и согласованности данных я хочу использовать многопроцессорность, чтобы сбор данных выполнялся с постоянной частотой выборки, когда данные считывались из базы данных на основе сетевых запросов.

...