Clojure / Java: наиболее эффективный метод минимизации использования полосы пропускания при выполнении сложных операций с потоком данных Amazon S3 - PullRequest
4 голосов
/ 27 августа 2010

Я выполняю потоковое чтение объекта с помощью BufferedReader.

Мне нужно сделать две вещи с этим объектом:

  1. Передайте его в CSV-ридер SuperCSV
  2. Получить необработанные строки и хранить их в (Clojure) ленивой последовательности

В настоящее время мне приходится использовать два разных BufferedReaders: один в качестве аргумента для класса чтения CSV SuperCSV и один для инициализации отложенной последовательности необработанных строк. Я эффективно загружаю объект S3 дважды, что дорого ($) и медленно.

Один из моих коллег отметил, что я ищу что-то похожее на Unix-команду "tee". Было бы полезно использовать BufferedReader, который каким-то образом можно «разделить», загрузить порцию данных и передать копию как в ленивую последовательность, так и в функции чтения csv.

В настоящее время я также изучаю, возможно ли обернуть ленивую последовательность в BufferedReader и передать , что , в супер csv. У меня были некоторые проблемы с кучей Java при передаче очень больших ленивых последовательностей нескольким потребителям, поэтому я немного беспокоюсь об использовании этого решения.

Другое решение - просто загрузить файл локально, а затем открыть два потока для этого файла. Это устраняет первоначальную мотивацию потоковой передачи: позволяет начинать обработку файла, как только начинают поступать данные.

Окончательное решение, которое я бы рассмотрел только в том случае, если ничего не работает, заключается в реализации моего собственного считывателя CSV, который возвращает как проанализированный CSV, так и исходную непарсированную строку. Если вы использовали очень надежный CSV-ридер, который может возвращать как Java-хэш проанализированных CSV-данных, так и исходную неразобранную строку, пожалуйста, дайте мне знать!

Спасибо!

Ответы [ 2 ]

2 голосов
/ 27 августа 2010

Я был бы склонен пойти на создание последовательности строк из сети, а затем передать это сколь угодно многим процессам, которые должны работать на этой последовательности; постоянные структуры данных - это круто. В случае необходимости превратить последовательность строк в Reader, который вы можете передать API SuperCSV, это работает:

(import '[java.io Reader StringReader])

(defn concat-reader
  "Returns a Reader that reads from a sequence of strings."
  [lines]
  (let [srs (atom (map #(StringReader. %) lines))]
    (proxy [Reader] []
      (read 
        ([] 
          (let [c (.read (first @srs))]
            (if (and (neg? c) (swap! srs next))
              (.read this)
              c)))
        ([cbuf] 
          (.read this cbuf 0 (count cbuf)))
        ([cbuf off len]
          (let [actual (.read (first @srs) cbuf off len)]
            (if (and (neg? actual) (swap! srs next))
              (.read this cbuf off len)
              actual))))
      (close [] ))))

Е.Г.

user=> (def r (concat-reader ["foo" "bar"]))
#'user/r
user=> (def cbuf (char-array 2))
#'user/cbuf
user=> (.read r cbuf)
2
user=> (seq cbuf)
(\f \o)
user=> (char (.read r))
\o
user=> (char (.read r))
\b
0 голосов
/ 28 октября 2010

Решение состояло в том, чтобы использовать один BufferedReader для всех обращений, а затем сбрасывать () его каждый раз, когда он передается функциональности, которую необходимо прочитать с самого начала.

...