Вы записываете JSON с двойным кодированием в дочерний процесс:
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValue = json.dumps(message)
message
уже является строкой JSON, поэтому jsonValue
является строкой JSON с двойным кодированием.
Потребитель канала помещает эту строку с двойным кодированием в очередь для сокета.Затем производитель веб-сокетов в socket_producer()
снова кодирует сообщение :
while True:
message = await incoming.get()
# ...
json_message = json.dumps(message)
await socket.send(json_message)
Так что теперь json_message
представляет собой значение JSON с тройным кодированием, документ JSON, содержащий документ JSON, содержащийдокумент JSON:
>>> import json
>>> message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}' # valid JSON
>>> json_message = json.dumps(message)
>>> print(json_message) # double-encoded
"{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"
>>> json_message = json.dumps(json_message) # encode *again*
>>> print(json_message) # triple-encoded
"\"{\\\"header\\\":{\\\"protocolVersion\\\":1,\\\"messageID\\\":2,\\\"stationID\\\":400}}}\""
Я не знаю точно, что делает с этим ваш веб-сокет, но давайте предположим, что он использует json.loads()
один раз, а затем эхом отдает декодированное сообщение.Это означает, что socket_consumer()
получает документ JSON, который закодирован только дважды.Ваш журнал FromSocket.txt
определенно подразумевает, что именно так и происходит, потому что он содержит JSON-сообщение с кодировкой double :
Это можно увидеть в журнале FromSocket.txt
:
From socket: "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400},\"cam\":{\"generationDeltaTime\":1,...other fields}}"
Обратите внимание на эти \"
записи, и весь документ обернут в кавычки, но в этом значении нет \\\
тройной обратной косой черты.
Тем не менее этот дополнительный уровень кодирования JSON нарушаетpipe_producer()
сопрограмма, которая ожидает, что сообщение будет декодировано в словарь, а не в другую строку (даже если эта строка содержит другой документ JSON):
message = json.loads(json_message)
type = int(message.get('header', {}).get('messageID', -1))
message
будет декодировать вместо строки, поэтому message.get
завершится с ошибкой AttributeError
, что приведет к выходу сопрограммы:
>>> json_message = "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}" # double encoded
>>> message = json.loads(json_message)
>>> message # Back one stop, single-encoded JSON
'{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'
>>> type(message) # it's a string with JSON, not a dictionary
<class 'str'>
>>> message.get('header')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AttributeError: 'str' object has no attribute 'get'
Вам необходимо убедиться, что вы не кодируете свои данные слишком много раз!Если ваш канал получает данные JSON, не кодирует данные снова при отправке в сокет.При отправке данных в канал из родительского процесса не следует дважды кодировать данные , если у вас уже есть строка JSON, нет смысла передавать ее через json.dumps()
еще раз.
Было бы также разумно добавить дополнительные функции безопасности в сопрограммы.Я не сделал JSON-декодирование достаточно надежным, поэтому давайте исправим эту часть:
async def pipe_producer(pipe_writer, incoming):
# take messages from the queue and send them to the pipe
while True:
json_message = await incoming.get()
try:
message = json.loads(json_message)
type = int(message.get('header', {}).get('messageID', -1))
except (ValueError, TypeError, AttributeError):
# failed to decode the message, or the message was not
# a dictionary, or the messageID was convertable to an integer
type = None
# 1 is DENM message, 2 is CAM message
if type in {1, 2}:
pipe_writer.write(json_message.encode('utf8') + b'\n')
await pipe_writer.drain()
Возможно, вы захотите записать, что где-то не удалось декодировать (отправлять сообщения в очередь журналов, которую отдельная задача получает дозапись в журнал).
Далее мы можем обновить функции connect_*
, чтобы не игнорировать исключения в задачах, которые завершаются:
done, pending = await asyncio.wait(
[consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()
Проверка done.result()
может повторно вызватьисключение, брошенное в потребителя или производителя.Так как сопрограммы connect_*
запускаются через asyncio.gather()
, который, в свою очередь, запускается loop.run_until_complete()
, то это исключение затем распространяется до функции main()
, поэтому он выйдет из Python и вы увидитеобратная связь напечатана.Я обновил свой другой ответ, добавив в него цикл for task in done: task.result()
, так как в любом случае это хорошая практика.
С просто циклом task.result()
в моем исходном коде ответа и веб-сокетом, который простовозвращает сообщение и вводя действительный документ JSON (без двойного кодирования), я сразу вижу ошибку;родительский процесс здесь - мой терминал, поэтому я просто копирую сообщение JSON в окно моего терминала для отправки данных в канал:
$ python3.7 so52291218.py
{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
Traceback (most recent call last):
File "so52291218.py", line 140, in <module>
main()
File "so52291218.py", line 137, in main
loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
File "/.../lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
return future.result()
File "so52291218.py", line 126, in connect_pipe
task.result()
File "so52291218.py", line 104, in pipe_producer
type = int(message.get("header", {}).get("messageID", -1))
AttributeError: 'str' object has no attribute 'get'
Когда я удаляю вызов json.dumps()
из socket_producer()
или Я изменяю свой сервер веб-сокетов, чтобы использовать json.loads()
для входящего сообщения и отправлять его как результат, затем код работает, и я получаю то же сообщение, возвращаемое обратно на мой терминал.
Обратите внимание, что вы не можете просто использовать цикл для записи в subprocess.Popen()
трубу , когда оба stdin
и stdout
являются каналами.Вы можете легко заставить дочерний процесс зависать при вводе / выводе, только записывая в цикл.Вы также должны были бы читать из канала stdout
, но поскольку дочерний процесс будет читать и писать из этих дескрипторов в фактически произвольном порядке, ваш родительский процесс также должен обрабатыватьввод / вывод для Popen()
труб асинхронно.
Вместо того, чтобы писать, как это сделать (что уже описано в других разделах Stack Overflow), я вместо этого советую вам использовать pexpect
проект , как это уже сделал все, что работает для вас (путем создания отдельного потока, который постоянно читает из канала stdout
);использование pexpect.popen_spawn.PopenSpawn()
, чтобы сохранить это значение близко к вашей первоначальной настройке, выглядело бы так:
import sys
import pexpect
test = '...'
process = pexpect.popen_spawn.PopenSpawn([sys.executable, test])
for i in range(5):
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}';
jsonValueBytes = message.encode("utf-8")
process.send(jsonValueBytes + b"\n")
# echo anything coming back
while True:
index = process.expect([process.crlf, pexpect.EOF, pexpect.TIMEOUT], timeout=0.1)
if not process.before:
break
print('>>>', process.before.decode('utf8', errors='replace'), flush=True)
# send EOF to close the pipe, then terminate the process
process.sendeof()
process.kill(1)
process.wait()
Таким образом, каждый раз, когда мы отправляем полную линию в канал, мы также ищем линии, идущие в другую сторону, скороткий тайм-аут и вывод любых подобных строк.
Со всеми исправлениями на месте (избегая многократного кодирования сообщений JSON) и очень простым эхо-сервером websocket , pexpect
код выше печатает:
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
Показывает, что существует полный путь туда и обратно от родительского процесса к дочернему процессу к веб-сокету и обратно.