Иногда я получаю сообщение об ошибке, когда запускаю Worker of Django Channels в качестве фоновой задачи. Задача использует открытый сторонний WebSocket с библиотекой Pysher, обрабатывает эти данные и затем отправляет полученные данные через канальный уровень в группу веб-сокетов, прослушивающих наше приложение с помощью Redis (Message Broker).
Я знаю, что проблема не связана с библиотекой Pysher, так как, если я прокомментирую код для отправки сообщения через канальный уровень, будут возникать проблемы с удалением. Проблема возникает из-за отправки сообщения на канальном уровне и только иногда, много раз, когда отправка происходит успешно, а в других случаях выдает ошибку, последнее поведение заставляет меня думать, что это может быть связано с Redis или Channel Слой и его конфигурация или что-то в этом роде, возможно, Redis достигает некоторого предела.
Сообщение об ошибке
error from callback bound method Connection._on_message of Connection(Thread-2, started daemon 140614612023040):
[Errno 99] Cannot assign requested address
SO и версия Redis: Ubuntu 16 + Redis 3.2
Зависимости в пунктах
aiodns==1.1.1
aiohttp==2.3.10
aioredis==1.0.0
amqp==2.2.2
asgiref==2.1.6
asn1crypto==0.24.0
astroid==1.6.1
async-timeout==2.0.0
attrs==17.4.0
autobahn==17.10.1
Automat==0.6.0
Babel==2.5.3
billiard==3.5.0.3
biopython==1.69
cchardet==2.1.1
celery==4.1.0
certifi==2018.1.18
cffi==1.11.4
channels==2.0.2
channels-redis==2.1.0
chardet==3.0.4
click==6.7
constantly==15.1.0
cryptography==2.1.4
daphne==2.0.4
dash==0.21.0
dash-core-components==0.21.1
dash-html-components==0.9.0
dash-renderer==0.11.3
decorator==4.2.1
Django==2.0.2
django-celery-beat==1.1.1
django-celery-results==1.0.1
django-mysql==2.2.0
Flask==0.12.2
Flask-Caching==1.4.0
Flask-Compress==1.4.0
flower==0.9.2
future==0.16.0
hiredis==0.2.0
hyperlink==17.3.1
idna==2.6
idna-ssl==1.0.0
incremental==17.5.0
ipython-genutils==0.2.0
isort==4.3.4
itsdangerous==0.24
Jinja2==2.10
jsonschema==2.6.0
jupyter-core==4.4.0
kombu==4.1.0
lazy-object-proxy==1.3.1
MarkupSafe==1.0
mccabe==0.6.1
msgpack==0.5.6
multidict==4.1.0
mysqlclient==1.3.12
nbformat==4.4.0
ndg-httpsclient==0.4.4
numpy==1.13.3
oauthlib==2.0.7
pandas==0.22.0
plotly==2.5.1
pyasn1==0.4.2
pyasn1-modules==0.2.1
pycares==2.3.0
pycparser==2.18
pylint==1.8.2
pyOpenSSL==17.5.0
Pysher==0.3.0
PySocks==1.6.8
python-dateutil==2.6.1
python-telegram-bot==10.0.1
python-twitter==3.4.1
pytz==2018.3
redis==2.10.6
requests==2.18.4
requests-oauthlib==0.8.0
service-identity==17.0.0
six==1.11.0
tornado==4.5.3
traitlets==4.3.2
Twisted==17.9.0
txaio==2.8.2
urllib3==1.22
vine==1.1.4
websocket-client==0.47.0
Werkzeug==0.14.1
wrapt==1.10.11
yarl==1.1.0
zope.interface==4.4.3
Конфигурация канального уровня
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("localhost", 6379)]
},
},
}
Мой код
# Synconsumer Class that runs as Channels Worker Background task
class BTWorker(SyncConsumer):
# Class method
def get_data(self, event):
# Callback function that runs every time We receive a message from the thrid party Websocket
def callback(*args, **kwargs):
# Data processing
raw_dict = json.loads(args[0])
var_dict = {
'id': raw_dict['id'],
'other_key': other_value
}
var_list = [var_dict]
# Send data via Channel Layer and Redis (CODE that generate the ISSUE!!)
async_to_sync(self.channel_layer.group_send)(
"group_name",
{
"type": "some.type",
"text": json.dumps(trade_uni_list)
}
)
# Websocket Third party connection handler
def connect_handler(data):
channel = pusher.subscribe('token')
channel.bind('channel', callback)
# Websocket Third Party connection via Pysher Library
pusher = pysher.Pusher('******some_key****')
pusher.connection.bind('pusher:connection_established', connect_handler)
pusher.connect()