Рецепт Clojure для динамических каналов / pub-sub - PullRequest
0 голосов
/ 10 мая 2018

Я ищу способ динамически создавать подписчиков на публикации, используя core.async (или все, что будет работать).

Проблема: у меня есть сообщения, которые мне нужно обработать на основе :sender сообщения. Каждое сообщение будет обрабатываться одной и той же функцией, но означает, что означает, что сообщения каждого отправителя будут обрабатываться по порядку, по одной - несколько тем на основе клавиши :sender с одним потребителем каждый , Мне также нужен способ ограничить количество активных потребителей во всех подписках, чтобы снизить использование ресурсов.

Я думал, что у меня будет pub канал:

(def in-chan (chan))
(def publication (pub in-chan :sender))

Но я хочу быть в состоянии обеспечить постоянную подписку, поскольку новые отправители подключаются к сети. Я открыт для лучших возможностей, пока код остается маленьким и простым.

Вопрос : Существует ли идиоматический способ обеспечения подписчика на конкретную публикацию перед отправкой сообщения? Как согласовать всех потребителей каждой подписки для использования общего пула потоков?

РЕДАКТИРОВАТЬ : я выяснил, как координировать работу, используя пул потоков и одного потребителя по теме. Я думаю, что для проверки, существует ли подпункт, я буду использовать ссылку на карту, чтобы сохранить название темы в подпункте. Если у ссылки нет записи для подписчика, я бы ее создал и добавил на карту; Далее я зарегистрирую подписчика на публикацию и опубликую сообщение. Цель этого вопроса - выяснить, есть ли лучший способ ускорить и отслеживать подписчиков динамически создаваемых тем.

1 Ответ

0 голосов
/ 13 мая 2018

Решение, которое я придумал, заключается в использовании ref:

(def registration (ref {}))

Эта регистрация используется темой, которая пишет в канал публикации непосредственно перед записью:

(defn register
  [registration topic-name]
  (dosync
    (let [r (ensure registration)
          something-to-track :tracked] ;; In my case, I'm keeping track of a channel
      (when-not (get r topic-name)
        (alter registration assoc topic-name something-to-track)
        something-to-track))))

Всякий раз, когда нам нужно опубликовать сообщение, мы можем использовать эту функцию, чтобы «зарегистрировать» нового подписчика. Если он ранее не существовал, он вернет something-to-track. В моем случае это канал, по которому я потом буду звонить sub. Если это nil, я могу проигнорировать это. Чтобы действительно не пропустить сообщения в параллельной среде, мне нужно было бы что-то сделать в транзакции (нужно понимать, защитит ли ensure меня, предоставив эксклюзивный доступ к registration между потоками), но мой конвейер настолько мал, что я могу написать в pub однопоточную.

...