Потоковая передача данных вызывающей стороне в JVM - PullRequest
0 голосов
/ 28 марта 2020

У меня есть функция, которая периодически получает данные, а затем прекращает получать данные. Эта функция должна возвращать данные, которые она периодически выбирает, вызывающей функции: либо

  1. Как и когда она получает
  2. за один выстрел

2-ая - простая реализация, т.е. вы блокируете вызывающего, извлекаете все данные и затем отправляете их за один раз.

Но я хочу реализовать 1-й (я хочу избежать обратных вызовов). Есть потоки вещи, которые будут использоваться здесь? Если так, то как? Если нет, то как вернуть something, при котором вызывающая сторона может запрашивать данные, а также останавливаться, когда возвращает сигнал о том, что данных больше нет?

Примечание. Я нахожусь в экосистеме JVM, близко к уточняйте c. Я посмотрел библиотеку clojure core.async, которая решает проблему такого рода с использованием каналов. Но я думал, есть ли другой способ, который, вероятно, выглядит следующим образом (предполагая, что потоки - это то, что можно использовать).
Java фрагмент

//Function which will periodically fetch MyData until there is no data
public Stream<MyData> myFunction() {
...
}

myFunction().filter(myData -> myData.text.equals("foo"))

Ответы [ 2 ]

1 голос
/ 28 марта 2020

Может быть, вы можете просто использовать seq - что по умолчанию лениво (например, Stream), чтобы вызывающий мог решить, когда извлекать данные. А когда данных больше нет, myFunction может просто завершить последовательность. При этом вы бы также включили некоторую оптимизацию в myFunction - например, чтобы получить данные в пакетном режиме, чтобы минимизировать количество обращений. Или периодически извлекайте данные в соответствии с вашим исходным требованием.

Вот одна наивная реализация:

(defn my-function []
  (let [batch 100]
    (->> (range)
         (map #(let [from (* batch %)
                     to   (+ from batch)]
                  (db-get from to)))
         ;; take while we have data from db-get
         (take-while identity)
         ;; returns as one single seq/Stream
         (apply concat))))

;; use it as a normal seq/Stream
(->> (my-function)
     (filter odd?))

, где db-get будет выглядеть примерно так:

(defn db-get [from to]
  ;; return first 1000 records only, i.e. returns nil to signal completion
  (when (< from 1000)
    ;; returns a range of records
    (range from to)))
1 голос
/ 28 марта 2020

Возможно, вы захотите проверить https://github.com/ReactiveX/RxJava и https://github.com/ReactiveX/RxClojure (кажется, больше не поддерживается?)

...