Я прочитал документацию Google и посмотрел их примеры , однако мне не удалось заставить это работать правильно в моем конкретном случае использования. Проблема заключается в том, что пакеты аудиопотока разбиты на более мелкие порции (размер кадра), кодированные и отправленные с помощью MQTT с помощью base64, что означает, что подход генератора, скорее всего, прекратит часть пути, несмотря на то, что отправитель не завершил его полностью. Мой компонент MicrophoneSender отправит заключительную часть сообщения с сегментным ключом = -1, так что это флаг того, что полное сообщение было отправлено и что полный / окончательный процесс потока может быть завершен. До этого момента в буфере может не быть всего всего потока, поэтому сложно получить либо a) генератор, чтобы прекратить давать б) google, чтобы вернуть частичную транскрипцию. Частичная транскрипция требуется один раз каждые 10 или около того кадров.
Чтобы проиллюстрировать это лучше, вот мой код.
внутри приемника:
STREAMFRAMETHRESHOLD = 10
def mqttMsgCallback(self, client, userData, msg):
if msg.topic.startswith("MicSender/stream"):
msgDict = json.loads(msg.payload)
streamBytes = b64decode(msgDict['audio_data'].encode('utf-8'))
frameNum = int(msgDict['segment_num'])
if frameNum == 0:
self.asr_time_start = time.time()
self.asr.endOfStream = False
if frameNum >= 0:
self.asr.store_stream_bytes(streamBytes)
self.asr.endOfStream = False
if frameNum % STREAMFRAMETHRESHOLD == 0:
self.asr.get_intermediate_and_print()
else:
#FINAL, recieved -1
trans = self.asr.finish_stream()
self.send_message(trans)
self.frameCount=0
внутри реализации Google Speech Class:
class GoogleASR(ASR):
def __init__(self, name):
super().__init__(name)
# STREAMING
self.stream_buf = queue.Queue()
self.stream_gen = self.getGenerator(self.stream_buf)
self.endOfStream = True
self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
self.current_transcript = ''
self.numCharsPrinted = 0
def getGenerator(self, buff):
while not self.endOfStream:
# Use a blocking get() to ensure there's at least one chunk of
# data, and stop iteration if the chunk is None, indicating the
# end of the audio stream.
chunk = buff.get()
if chunk is None:
return
data = [chunk]
# Now consume whatever other data's still buffered.
while True:
try:
chunk = buff.get(block=False)
data.append(chunk)
except queue.Empty:
self.endOfStream = True
yield b''.join(data)
break
yield b''.join(data)
def store_stream_bytes(self, bytes):
self.stream_buf.put(bytes)
def get_intermediate_and_print(self):
self.get_intermediate()
def get_intermediate(self):
if self.stream_buf.qsize() > 1:
print("stream buf size: {}".format(self.stream_buf.qsize()))
responses = self.client.streaming_recognize(self.streaming_config, self.requests)
# print(responses)
try:
# Now, put the transcription responses to use.
if not self.numCharsPrinted:
self.numCharsPrinted = 0
for response in responses:
if not response.results:
continue
# The `results` list is consecutive. For streaming, we only care about
# the first result being considered, since once it's `is_final`, it
# moves on to considering the next utterance.
result = response.results[0]
if not result.alternatives:
continue
# Display the transcription of the top alternative.
self.current_transcript = result.alternatives[0].transcript
# Display interim results, but with a carriage return at the end of the
# line, so subsequent lines will overwrite them.
#
# If the previous result was longer than this one, we need to print
# some extra spaces to overwrite the previous result
overwrite_chars = ' ' * (self.numCharsPrinted - len(self.current_transcript))
sys.stdout.write(self.current_transcript + overwrite_chars + '\r')
sys.stdout.flush()
self.numCharsPrinted = len(self.current_transcript)
def finish_stream(self):
self.endOfStream = False
self.get_intermediate()
self.endOfStream = True
final_result = self.current_transcript
self.stream_buf= queue.Queue()
self.allBytes = bytearray()
self.current_transcript = ''
self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
return final_result
В настоящее время это ничего не выводит со стороны транскрипции.
stream buf size: 21
stream buf size: 41
stream buf size: 61
stream buf size: 81
stream buf size: 101
stream buf size: 121
stream buf size: 141
stream buf size: 159
Но ответ / стенограмма пуста. Если я ставлю точку останова на ответ for в ответах внутри функции get_intermediate, то она никогда не запускается, что означает, что по какой-то причине она пуста (не перенастраивается от Google). Однако, если я установлю точку останова на генератор и займу слишком много времени (> 5 секунд), чтобы продолжить выдачу данных, он (Google) скажет мне, что данные, вероятно, отправляются на сервер слишком медленно. google.api_core.exceptions.OutOfRange: 400 Audio data is being streamed too slow. Please stream audio data approximately at real time.
Может быть, кто-то может заметить здесь очевидное ...