(чтобы ответить на комментарий Насира и для последующих посетителей, вот мои полные настройки)
Каналы и их работники действительно были хорошим выбором для моего проекта, и у меня есть кое-что, что хорошо работает. Он еще не работает, но работает нормально, а код хорошо структурирован, с ним легко работать и т. Д.
Прежде всего нам нужно настроить работника и заставить его работать. Давайте предположим, что наш рабочий класс - ExternalData, мы собираемся настроить отдельный канал для рабочего:
# routing.py
application = ProtocolTypeRouter({
# ...
'channel': ChannelNameRouter({
"external-data": ExternalData,
})
})
# asgi.py
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# ...
# add this to the end of the file
channel_layer = get_channel_layer()
logger.info("Sending start signal to ExternalData")
async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })
# external_data.py worker's code
# used as a singleton object
class DataStore(object):
@classmethod
async def create(cls, owner):
self = DataStore()
self.currencies = {}
self.owner = owner
# ...
return self
class ExternalData(AsyncConsumer):
started = False
# triggered from asgi.py via daphne start
async def external_data_start(self, event):
if ExternalData.started:
if settings.DEBUG:
raise RuntimeError("ExternalData already working.")
else:
logger.warning("ExternalData already working.")
return
else:
# do your initialization work here and let the data producer start listening and saving external data
ExternalData.started = True
self.store = await DataStore.create(owner=self)
DataStore в приведенном выше коде, разумеется, не является необходимым, но если вы собираетесь делать что-то сложное, может быть лучше использовать ExternalData только для каналов, связанных с вещами, и делать другие вещи в другом классе. При такой настройке вам нужно сначала запустить рабочий:
python manage.py runworker external-data
и затем запустите daphne (т. Е. В другом терминале, чтобы увидеть вывод их обоих):
daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application
В производственном процессе, когда вам нужно написать сервис или аналогичную Дафну, нужно запустить чуть позже (например, спать 2-3 секунды), чтобы убедиться, что рабочий файл обрабатывается Python и работает. Вы также можете многократно пробовать код asgi.py (то есть в цикле с некоторым сном), пока рабочий не установит какой-либо флаг среды.
Теперь наш поставщик данных запущен, но как насчет клиентов? Нам нужен клиент, который в основном будет выступать в качестве посредника между нашим поставщиком данных и клиентами. Для моего проекта требования по передаче данных покрывали большинство случаев:
- A: когда клиент подключается, отправьте некоторые начальные данные
- B: клиент может посетить страницу и ему нужно получить дополнительные данные, связанные со страницей
- C: клиент находится на странице, где вам нужно отправлять данные в реальном времени и обновлять страницу
- D: пришли новые данные, и вам нужно сообщить клиенту
Наше одностраничное приложение, поэтому нам все это нужно. Вот фрагмент кода, в котором описано, как я справлялся со всеми этими случаями:
# consumer.py
class FeedsConsumer(AsyncJsonWebsocketConsumer):
groups = ["broadcast"] # for requirement D
# triggered from client
async def connect(self):
await self.accept()
self.listening = set() # for requirement C
logger.info(f"New client connected: {self.channel_name}")
# for requirement A
await self.channel_layer.send("external-data",
{ "type": "external.new_client", 'client_channel': self.channel_name })
# triggered from client
async def receive_json(self, data):
# for requirement B
if data["type"] == "get_currency":
payload["type"] = "external.send_currency"
payload["client_channel"] = self.channel_name
payload["currency"] = data["currency"]
self.listen(data["currency"]) # for requirement C
await self.channel_layer.send("external-data", payload)
# for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
async def listen(self, item_id):
if item_id not in self.listening:
await self.channel_layer.group_add(item_id, self.channel_name )
self.listening.add(item_id)
# below are triggered from the worker. A and B as responses. C and D as server generated messages
# for requirement A
async def init_data(self, payload):
await self.send_json(payload)
# for requirement B
async def send_currency(self, payload):
await self.send_json(payload)
# for requirement C
async def new_value(self, payload):
await self.send_json(payload)
# for requirement D
async def new_currency(self, payload):
await self.send_json(payload)
# external_data.py worker's code
class ExternalData(AsyncConsumer):
# for requirement A. triggered from consumer.
async def external_new_client(self, payload):
data_to_send = list(self.store.currencies.keys())
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # new client
{ 'type': 'init_data',
'data': data_to_send,
})
# for requirement B. triggered from consumer.
async def external_send_currency(self, payload):
data_to_send = self.store.currencies[payload["currency"]]
# prepare your data above and then send it like below
await self.channel_layer.send(payload["client_channel"], # only the client who requested data
{ 'type': 'send_currency',
'data': data_to_send,
})
async def new_data_arrived(self, currency, value):
if currency not in self.store.currencies:
self.store.currencies[currency] = value
# requirement D. suppose this is new data so we need to notify all connected users of its availability
await self.channel_layer.group_send("broadcast", # all clients are in this group
{ 'type': 'new_currency',
'data': currency,
})
else:
# requirement C, notify listeners.
self.store.currencies[currency] = value
await self.channel_layer.group_send(currency, # all clients listening to this currency
{ 'type': 'new_value',
'currency': currency,
'value': value,
})
Надеюсь, я не испортил код, и он не слишком сложен (мне было лень вставлять / редактировать отдельный код для каждого требования). Пожалуйста, задавайте любые вопросы в комментариях.