Интеграция источника данных как рабочего в Django Channels 2.x - PullRequest
0 голосов
/ 28 июня 2018

Я занимаюсь разработкой приложения, в котором данные в реальном времени, которые будут передаваться клиентам, будут поступать из внешнего API. Простую версию этого можно представить как трекер валютной валюты. Пользователь будет указывать, какие валюты он хочет отслеживать (доллары США, евро, фунты и т. Д.) И получать обновления в режиме реального времени. Данные о валюте будут поступать из внешнего API через длительный опрос. У меня вопрос, как интегрировать этого производителя данных в каналы?

Во всех примерах каналов, которые я обнаружил, работа работника запускается событием, но в моем случае она будет начинаться с самого начала, работать непрерывно, и вместо получения событий она будет просто выдвигать новые значения на уровень канала, чтобы подписчики могли получать уведомления. Так что я не уверен, что потребительская модель - правильная. Подводя итог моим вопросам:

  • Должен ли я использовать потребителя для этой задачи и как его настроить? Учитывая, что к API будет обращаться длинный асинхронный опрос или потребитель синхронизации? Начать опрос внешнего API в методе connect или просто отправить одноразовое событие для этого? Откуда и когда отправить это «начало работы» событие?

  • Я также хочу использовать redis для хранения значений для предоставления начального значения валют пользователю. Они начнут прислушиваться к обновлениям при подключении, но, возможно, обновление придет много секунд спустя. Могу ли я получить доступ к экземпляру подключения Redis, используемому канальным уровнем, или мне нужно для этого открыть другое подключение к моему Redis?

Другим вариантом для производителя данных может быть сохранение его полностью за пределами каналов Django, как описано здесь , и просто передача данных на канальный уровень, но я не уверен во время развертывания, что может быть проблематично с Дафной. Я имею в виду, как я могу убедиться, что он работает и правильно делится ресурсами с каналами?

Спасибо.

Ответы [ 2 ]

0 голосов
/ 23 ноября 2018

(чтобы ответить на комментарий Насира и для последующих посетителей, вот мои полные настройки)

Каналы и их работники действительно были хорошим выбором для моего проекта, и у меня есть кое-что, что хорошо работает. Он еще не работает, но работает нормально, а код хорошо структурирован, с ним легко работать и т. Д.

Прежде всего нам нужно настроить работника и заставить его работать. Давайте предположим, что наш рабочий класс - ExternalData, мы собираемся настроить отдельный канал для рабочего:

# routing.py
application = ProtocolTypeRouter({
    # ...
    'channel': ChannelNameRouter({
        "external-data": ExternalData,
    })
})

# asgi.py  
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# ...
# add this to the end of the file
channel_layer = get_channel_layer()
logger.info("Sending start signal to ExternalData")
async_to_sync(channel_layer.send)( "external-data", { "type": "external_data.start" })

# external_data.py   worker's code

# used as a singleton object
class DataStore(object):

    @classmethod
    async def create(cls, owner):
        self = DataStore() 
        self.currencies = {}
        self.owner = owner
        # ...
        return self

class ExternalData(AsyncConsumer):

    started = False

    # triggered from asgi.py via daphne start
    async def external_data_start(self, event):

        if ExternalData.started:
            if settings.DEBUG:
                raise RuntimeError("ExternalData already working.")
            else:
                logger.warning("ExternalData already working.")
                return
        else:
            # do your initialization work here and let the data producer start listening and saving external data 
            ExternalData.started = True
            self.store = await DataStore.create(owner=self)

DataStore в приведенном выше коде, разумеется, не является необходимым, но если вы собираетесь делать что-то сложное, может быть лучше использовать ExternalData только для каналов, связанных с вещами, и делать другие вещи в другом классе. При такой настройке вам нужно сначала запустить рабочий:

python manage.py runworker external-data 

и затем запустите daphne (т. Е. В другом терминале, чтобы увидеть вывод их обоих):

daphne -b 0.0.0.0 -p 8000 YOUR_PROJECT.asgi:application

В производственном процессе, когда вам нужно написать сервис или аналогичную Дафну, нужно запустить чуть позже (например, спать 2-3 секунды), чтобы убедиться, что рабочий файл обрабатывается Python и работает. Вы также можете многократно пробовать код asgi.py (то есть в цикле с некоторым сном), пока рабочий не установит какой-либо флаг среды.

Теперь наш поставщик данных запущен, но как насчет клиентов? Нам нужен клиент, который в основном будет выступать в качестве посредника между нашим поставщиком данных и клиентами. Для моего проекта требования по передаче данных покрывали большинство случаев:

  • A: когда клиент подключается, отправьте некоторые начальные данные
  • B: клиент может посетить страницу и ему нужно получить дополнительные данные, связанные со страницей
  • C: клиент находится на странице, где вам нужно отправлять данные в реальном времени и обновлять страницу
  • D: пришли новые данные, и вам нужно сообщить клиенту

Наше одностраничное приложение, поэтому нам все это нужно. Вот фрагмент кода, в котором описано, как я справлялся со всеми этими случаями:

# consumer.py

class FeedsConsumer(AsyncJsonWebsocketConsumer):
    groups = ["broadcast"]   # for requirement D

    # triggered from client
    async def connect(self):
        await self.accept()
        self.listening = set()  # for requirement C
        logger.info(f"New client connected: {self.channel_name}")
        # for requirement A
        await self.channel_layer.send("external-data",
           { "type": "external.new_client", 'client_channel': self.channel_name })

    # triggered from client
    async def receive_json(self, data):        
            # for requirement B
            if data["type"] == "get_currency":
                payload["type"] = "external.send_currency"
                payload["client_channel"] = self.channel_name
                payload["currency"] = data["currency"]
                self.listen(data["currency"])  # for requirement C
                await self.channel_layer.send("external-data", payload)

    # for requirement C, you possibly need a counterpart unlisten to remove channel_name from the group and update self.listening set
    async def listen(self, item_id):
            if item_id not in self.listening:
                await self.channel_layer.group_add(item_id, self.channel_name )
                self.listening.add(item_id)    

    # below are triggered from the worker. A and B as responses. C and D as server generated messages 

    # for requirement A
    async def init_data(self, payload):
        await self.send_json(payload)

    # for requirement B
    async def send_currency(self, payload):
        await self.send_json(payload) 

    # for requirement C
    async def new_value(self, payload):
        await self.send_json(payload)  

    # for requirement D
    async def new_currency(self, payload):
        await self.send_json(payload) 

# external_data.py   worker's code

class ExternalData(AsyncConsumer):

    # for requirement A. triggered from consumer.
    async def external_new_client(self, payload):
        data_to_send = list(self.store.currencies.keys())
        # prepare your data above and then send it like below
        await self.channel_layer.send(payload["client_channel"],  # new client
          { 'type': 'init_data',
            'data': data_to_send,
          })

    # for requirement B. triggered from consumer.
    async def external_send_currency(self, payload):
        data_to_send = self.store.currencies[payload["currency"]]
        # prepare your data above and then send it like below
        await self.channel_layer.send(payload["client_channel"],  # only the client who requested data
          { 'type': 'send_currency',
            'data': data_to_send,
          })


    async def new_data_arrived(self, currency, value):
         if currency not in self.store.currencies:
             self.store.currencies[currency] = value
             # requirement D. suppose this is new data so we need to notify all connected users of its availability
             await self.channel_layer.group_send("broadcast",  # all clients are in this group
               { 'type': 'new_currency',
                 'data': currency,
               })
         else:
             # requirement C, notify listeners.
             self.store.currencies[currency] = value
             await self.channel_layer.group_send(currency,  # all clients listening to this currency
               { 'type': 'new_value',
                 'currency': currency,
                 'value': value,
               })

Надеюсь, я не испортил код, и он не слишком сложен (мне было лень вставлять / редактировать отдельный код для каждого требования). Пожалуйста, задавайте любые вопросы в комментариях.

0 голосов
/ 30 июня 2018

Рабочие подходят для вашего случая использования. Они предназначены для длительного использования, и не существует нового экземпляра для каждого запроса. Если вы хотите сделать своих потребителей асинхронными, вы должны быть абсолютно уверены, что вы ничего не делаете. Все запросы БД должны быть помещены в database_sync_to_async, даже если вызов БД происходит на 5 уровней ниже стека вызовов. Вы можете использовать API-интерфейс кеширования Django для подключения к Redis, но вам лучше работать вне его, чтобы сохранить все асинхронным. Использовать каналы библиотеки redis использует напрямую, поскольку она имеет асинхронные методы для работы с redis в качестве кэша.

...