То есть параллельный подсчет размеров файлов должен быть таким простым?
Это не так:)
Я пытался решить эту проблему лучше . Я понял, что я блокирую операции ввода-вывода , поэтому pmap не выполняет эту работу.
Я подумал, может быть, было бы целесообразно дать агентам куски каталогов ( branch ) для их самостоятельной обработки. Похоже, это делает :)
Ну, я еще не тестировал.
Это работает, но могут быть некоторые проблемы с символическими ссылками в UNIX-подобных системах.
(def user-dir (clojure.java.io/file "/home/janko/projects/"))
(def root-dir (clojure.java.io/file "/"))
(def run? (atom true))
(def *max-queue-length* 1024)
(def *max-wait-time* 1000) ;; wait max 1 second then process anything left
(def *chunk-size* 64)
(def queue (java.util.concurrent.LinkedBlockingQueue. *max-queue-length* ))
(def agents (atom []))
(def size-total (atom 0))
(def a (agent []))
(defn branch-producer [node]
(if @run?
(doseq [f node]
(when (.isDirectory f)
(do (.put queue f)
(branch-producer (.listFiles f)))))))
(defn producer [node]
(future
(branch-producer node)))
(defn node-consumer [node]
(if (.isFile node)
(.length node)
0))
(defn chunk-length []
(min (.size queue) *chunk-size*))
(defn compute-sizes [a]
(doseq [i (map (fn [f] (.listFiles f)) a)]
(swap! size-total #(+ % (apply + (map node-consumer i))))))
(defn consumer []
(future
(while @run?
(when-let [size (if (zero? (chunk-length))
false
(chunk-length))] ;appropriate size of work
(binding [a (agent [])]
(dotimes [_ size] ;give us all directories to process
(when-let [item (.poll queue)]
(set! a (agent (conj @a item)))))
(swap! agents #(conj % a))
(send-off a compute-sizes))
(Thread/sleep *max-wait-time*)))))
Вы можете запустить его, набрав
(producer (list user-dir))
(consumer)
Для типа результата
@size-total
Вы можете остановить это (есть работающие фьючерсы - поправьте меня, если я ошибаюсь)
(swap! run? not)
Если вы обнаружите какие-либо ошибки / ошибки, вы можете поделиться своими идеями!