Clojure - способ построения сервера live-stream - с обещаниями, но верный ли это путь - PullRequest
3 голосов
/ 06 марта 2012

Я ищу обходной путь построения сервера потоковой передачи. Конкретная проблема, с которой я борюсь, - это как отправить значения от одного поставщика (веб-камеры) неопределенному количеству потоков (подключенных клиентов). Очевидно, что когда клиент подключается, его не интересует полный видеофайл с веб-камеры, в основном ему нужно отправить заголовок, а затем все пакеты, поступающие с веб-камеры в этот момент.

В прямой Java, я думаю, это будет легко. Когда клиент подключается, добавляйте соединение в массив, когда он отключается, удаляйте соединение из массива, и всякий раз, когда приходит новый пакет с веб-камеры, отправляйте его каждой записи в массиве. Заблокируйте массив так, чтобы мы либо добавляли / удаляли записи, либо проходили через него для отправки пакетов. Конечно, мы могли бы построить то же самое в ближайшем будущем, но это звучит действительно злобно.

В многопоточной архитектуре передачи сообщений это звучит одинаково просто.

Единственное решение, которое я мог придумать в ближайшем будущем, - это ленивая последовательность обещаний. Действительно, это работает, но мне было интересно, есть ли другой способ, который приводит к более чистому коду и большему количеству clojure-zen:)

Просто для иллюстрации: упрощенная задача с обещаниями и атомами:

Одна функция провайдера, генерирующая данные, одна нить, которая читает эти данные. Позже создаются некоторые другие потоки, которые хотели бы получить данные из этого первого потока, но не могут добраться до них.

(defn provider []                                                                                                                                                                                                                                                                                                             
  (lazy-seq                                                                                                                                                                                                                                                                                                                   
    (do                                                                                                                                                                                                                                                                                                                       
      (Thread/sleep 100)                                                                                                                                                                                                                                                                                                      
      (cons (rand) (provider)))))                                                                                                                                                                                                                                                                                             

(def printer (agent nil))                                                                                                                                                                                                                                                                                                     
(defn log [& line]                                                                                                                                                                                                                                                                                                            
  (send-off printer (fn [x] (apply println line))))                                                                                                                                                                                                                                                                           

(def promises (atom (repeatedly promise)))                                                                                                                                                                                                                                                                                    

(defn client-connected-thread [x input]                                                                                                                                                                                                                                                                                       
  (log "Client connection " x " is connected with the provider and just received" @(first input))                                                                                                                                                                                                                             
  (recur x (rest input)))                                                                                                                                                                                                                                                                                                     

(.start (Thread. (fn []                                                                                                                                                                                                                                                                                                       
                   (loop [stream (provider)]                                                                                                                                                                                                                                                                                  
                     (when-let [item (first stream)]                                                                                                                                                                                                                                                                          
                       (log "I received " item", will share now")                                                                                                                                                                                                                                                             
                       (deliver (first @promises) item)                                                                                                                                                                                                                                                                       
                       (swap! promises rest))                                                                                                                                                                                                                                                                                 
                       (recur (rest stream))))))                                                                                                                                                                                                                                                                              


(Thread/sleep 300)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 1 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 100)                                                                                                                                                                                                                                                                                                            
(.start (Thread. #(client-connected-thread 2 @promises)))                                                                                                                                                                                                                                                                     
(Thread/sleep 50)                                                                                                                                                                                                                                                                                                             
(.start (Thread. #(client-connected-thread 3 @promises)))            

Итак, в основном вопрос: это правильный способ решения этой проблемы?

Кроме того, мы говорим о сервере потокового мультимедиа, поэтому функция провайдера будет предоставлять десятки тысяч элементов в секунду, и может быть подключено 10 клиентов. Система обещания предназначена для такого интенсивного использования?

Ответы [ 2 ]

1 голос
/ 06 марта 2012

Взгляните на Алеф .Это библиотека для предоставления «асинхронных каналов», которая может помочь вам реализовать необходимый сценарий.

1 голос
/ 06 марта 2012

В Clojure есть агенты для ситуаций, когда вам нужно отправлять информацию асинхронно, что кажется подходящим для вашего варианта использования.

Вы действительно очень близки, просто застряли рабочие агенты в нескольких местах, чтобы закончить.

"В прямом Clojure , я думаю, это будет легко.Каждый раз, когда клиент подключается, добавьте соединение к вектору агентов в агенте , когда он отключится, удалите соединение из агента агентов , и при появлении нового пакета с веб-камерыотправьте его каждому агенту в агенте . "

убедитесь, что вы используете send-off вместо send, чтобы избежать опустошения пула потоков.

Это имеет ряд преимуществ по сравнению с подходом «блокировать массив»:

  • один медленный клиент не остановит вас от добавления или удаления или добавления соединений
  • клиенты в конечном итогеполучить все кадры без необходимости отслеживать каждый отдельно
  • вам не нужно беспокоиться о блокировке
  • вам не нужно вручную выделять потоки
  • вы можете использоватьчасы и такие, чтобы сообщить о производительности шбез изменения простого ядра алгоритма.

грубый контур будет выглядеть так:

user> (def connections-stub (range))
user> (def connections (agent []))
#'user/connections
user> (defn accept-connection [connection] 
    (send connections conj (agent connection)))
#'user/accept-connection
user> (map accept-connection (take 10 connections-stub))
(#<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]> #<Agent@2a81df82: [#<Agent@3478e59b: 0> #<Agent@6d2a3e06: 1> #<Agent@704e33e7: 2> #<Agent@1e31bc4b: 3> #<Agent@5340ef69: 4> #<Agent@4c260132: 5> #<Agent@5318a0ac: 6> #<Agent@75dca6d2: 7> #<Agent@694c6171: 8> #<Agent@159177b9: 9>]>)

user> (defn send-frame [con-agent frame] 
       (send con-agent 
         (fn [connection frame] 
           (println "sending " frame " to " connection) connection) frame))
#'user/send-frame

user> (send-frame (first @connections) "hello")
sending  hello  to  0
#<Agent@da69a9c: 0>

user> (defn dispatch-frame [frame] 
        (doall (map #(send-frame % frame) @connections)))
#'user/dispatch-frame

user> (dispatch-frame "hello")
sending  hello  to  0
sending  hello  to  1
sending  hello  to  2
sending  hello  to  3
sending  hello  to  4
sending  hello  to  5
sending  hello  to  6
sending  hello  to  7
sending  hello  to  8
sending  hello  to  9
(#<Agent@da69a9c: 0> #<Agent@34f07ec4: 1> #<Agent@11ee68d1: 2> #<Agent@3b237a89: 3> #<Agent@1641d6b4: 4> #<Agent@3c76ced6: 5> #<Agent@1c05629d: 6> #<Agent@258d3fca: 7> #<Agent@5c56fa08: 8> #<Agent@52395294: 9>)
user> 
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...