Я работаю над созданием приложения для транскрипции аудио в реальном времени с использованием Python и Flask. Я использую pyaudio для захвата аудиопотока с микрофона и сокетов, чтобы отправить аудиоданные на сервер, который затем вызывает API Google речь в текст для генерации транскриптов для аудиопотока. Я хочу обновить свой код таким образом, чтобы, если в течение 60 секунд не было аудиовхода, он автоматически закрывал разъем и выходил.
До сих пор я не смог этого сделать.
import pyaudio
import socket
import requests
import select
import time
from random import randrange
import secrets
FORMAT = pyaudio.paInt16 # input audio format
CHANNELS = 1 # Number of input channels
RATE = 16000 # Sample Hertz Rate
CHUNK = 1024 # Audio Chunk Size
def get_free_tcp_address():
"""
This function finds the IP address of the machine it is being run on.
returns: IP address
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip_addr = s.getsockname()[0]
s.close()
return ip_addr
def get_free_tcp_port():
"""
This function finds the open port on the machine it is being run on.
returns: Open port number.
"""
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(('', 0))
addr, port = tcp.getsockname()
tcp.close()
return port
session_id = secrets.token_hex(16) #Generates a 16-digit session id.
port = get_free_tcp_port() #store the open port number in a variable.
ip_address = get_free_tcp_address() #store the IP address in a variable.
clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # Declare a socket-type.
clientsocket.bind((ip_address, port)) # Bind the IP address and port number to the socket.
clientsocket.listen() # Listen for socket connections.
print("I am here")
r = requests.post('http://127.0.0.1:5000/api/', data={'ip': ip_address, 'port': str(port), 'session_id': session_id})
print("Session ID : ", session_id)
def callback(in_data, frame_count, time_info, status):
"""
This is a callback function to continuously stream the audio data to the other end of the socket.
"""
for s in read_list[1:]:
s.send(in_data)
return (None, pyaudio.paContinue)
audio = pyaudio.PyAudio() # instantiate the PyAudio class
# start Recording
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=callback)
read_list = [clientsocket] # store the clientsockt to the list.
try:
print("XYZ")
while True:
readable, writable, errored = select.select(read_list, [], [])
for s in readable:
if s is clientsocket:
(serversocket, address) = clientsocket.accept() # accept the socket connection and store the socket and address in variable.
read_list.append(serversocket) # append the serversocket to the list.
print("Connection from " + str(address))
print("recording...")
except KeyboardInterrupt:
print("interrupt")
pass
print("finished recording")
clientsocket.close()
# stop Recording
stream.stop_stream()
stream.close()
audio.terminate()