Проблема с потоковой передачей звука в Python с микрофона через MQTT в Google Streaming с использованием генераторов - PullRequest
0 голосов
/ 13 сентября 2018

Я прочитал документацию Google и посмотрел их примеры , однако мне не удалось заставить это работать правильно в моем конкретном случае использования. Проблема заключается в том, что пакеты аудиопотока разбиты на более мелкие порции (размер кадра), кодированные и отправленные с помощью MQTT с помощью base64, что означает, что подход генератора, скорее всего, прекратит часть пути, несмотря на то, что отправитель не завершил его полностью. Мой компонент MicrophoneSender отправит заключительную часть сообщения с сегментным ключом = -1, так что это флаг того, что полное сообщение было отправлено и что полный / окончательный процесс потока может быть завершен. До этого момента в буфере может не быть всего всего потока, поэтому сложно получить либо a) генератор, чтобы прекратить давать б) google, чтобы вернуть частичную транскрипцию. Частичная транскрипция требуется один раз каждые 10 или около того кадров.

Чтобы проиллюстрировать это лучше, вот мой код.

внутри приемника:

    STREAMFRAMETHRESHOLD = 10
    def mqttMsgCallback(self, client, userData, msg):
         if msg.topic.startswith("MicSender/stream"):
                msgDict = json.loads(msg.payload)
                streamBytes = b64decode(msgDict['audio_data'].encode('utf-8'))
                frameNum = int(msgDict['segment_num'])

                if frameNum == 0:
                    self.asr_time_start = time.time()
                    self.asr.endOfStream = False

                if frameNum >= 0:
                    self.asr.store_stream_bytes(streamBytes)
                    self.asr.endOfStream = False

                    if frameNum % STREAMFRAMETHRESHOLD == 0:
                        self.asr.get_intermediate_and_print()

                else:
                    #FINAL, recieved -1
                    trans = self.asr.finish_stream()
                    self.send_message(trans)
                    self.frameCount=0

внутри реализации Google Speech Class:

class GoogleASR(ASR):

    def __init__(self, name):
        super().__init__(name)    

        # STREAMING
        self.stream_buf = queue.Queue()
        self.stream_gen = self.getGenerator(self.stream_buf)
        self.endOfStream = True
        self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
        self.streaming_config = types.StreamingRecognitionConfig(config=self.config)
        self.current_transcript = ''
        self.numCharsPrinted = 0

    def getGenerator(self, buff):
        while not self.endOfStream:
            # Use a blocking get() to ensure there's at least one chunk of
            # data, and stop iteration if the chunk is None, indicating the
            # end of the audio stream.
            chunk = buff.get()
            if chunk is None:
                return
            data = [chunk]

            # Now consume whatever other data's still buffered.
            while True:
                try:
                    chunk = buff.get(block=False)
                    data.append(chunk)

                except queue.Empty:
                    self.endOfStream = True
                    yield b''.join(data)
                    break

            yield b''.join(data)


    def store_stream_bytes(self, bytes):
        self.stream_buf.put(bytes)

    def get_intermediate_and_print(self):
        self.get_intermediate()

    def get_intermediate(self):

        if self.stream_buf.qsize() > 1:
            print("stream buf size: {}".format(self.stream_buf.qsize()))
            responses = self.client.streaming_recognize(self.streaming_config, self.requests)
            # print(responses)

            try:
                # Now, put the transcription responses to use.
                if not self.numCharsPrinted:
                    self.numCharsPrinted = 0

                for response in responses:

                    if not response.results:
                        continue

                    # The `results` list is consecutive. For streaming, we only care about
                    # the first result being considered, since once it's `is_final`, it
                    # moves on to considering the next utterance.
                    result = response.results[0]
                    if not result.alternatives:
                        continue

                    # Display the transcription of the top alternative.
                    self.current_transcript = result.alternatives[0].transcript

                    # Display interim results, but with a carriage return at the end of the
                    # line, so subsequent lines will overwrite them.
                    #
                    # If the previous result was longer than this one, we need to print
                    # some extra spaces to overwrite the previous result
                    overwrite_chars = ' ' * (self.numCharsPrinted - len(self.current_transcript))
                    sys.stdout.write(self.current_transcript + overwrite_chars + '\r')
                    sys.stdout.flush()
                    self.numCharsPrinted = len(self.current_transcript)

    def finish_stream(self):
        self.endOfStream = False
        self.get_intermediate()
        self.endOfStream = True

        final_result = self.current_transcript

        self.stream_buf= queue.Queue()
        self.allBytes = bytearray()
        self.current_transcript = ''
        self.requests = (types.StreamingRecognizeRequest(audio_content=chunk) for chunk in self.stream_gen)
        self.streaming_config = types.StreamingRecognitionConfig(config=self.config)

        return final_result

В настоящее время это ничего не выводит со стороны транскрипции.

stream buf size: 21
stream buf size: 41
stream buf size: 61
stream buf size: 81
stream buf size: 101
stream buf size: 121
stream buf size: 141
stream buf size: 159

Но ответ / стенограмма пуста. Если я ставлю точку останова на ответ for в ответах внутри функции get_intermediate, то она никогда не запускается, что означает, что по какой-то причине она пуста (не перенастраивается от Google). Однако, если я установлю точку останова на генератор и займу слишком много времени (> 5 секунд), чтобы продолжить выдачу данных, он (Google) скажет мне, что данные, вероятно, отправляются на сервер слишком медленно. google.api_core.exceptions.OutOfRange: 400 Audio data is being streamed too slow. Please stream audio data approximately at real time.

Может быть, кто-то может заметить здесь очевидное ...

1 Ответ

0 голосов
/ 13 сентября 2018

То, как вы организовали свой код, генератор, который вы предоставляете API Google, инициализируется ровно один раз - в строке 10, используя выражение генератора: self.requests = (...). Как построено, этот генератор также будет работать ровно один раз и станет «истощенным». То же самое относится и к функции генератора, которую сам генератор (for ...) вызывает (self.getGeneerator()). Он будет запущен только один раз и остановится, когда получит 10 блоков данных (которые, как я вижу, очень малы). Затем внешний генератор (то, что вы назначили для self.requests) также остановится навсегда - предоставив ASR только короткий бит данных (10 × 20 байт, глядя на вывод отладочной информации). Скорее всего, в этом нет ничего узнаваемого.

Кстати, обратите внимание, у вас есть избыточный yield b''.join(data) в вашей функции, данные будут отправлены дважды.

Вам нужно будет переделать (внешний) генератор, чтобы он не возвращался, пока не будут получены все данные. Если вы хотите использовать другой генератор так же, как вы собираете каждый больший кусок для «внешнего» генератора, из которого читает API Google, вам придется пересоздавать его каждый раз, когда вы начинаете с ним новый цикл.

...