Я читаю файл паркета и получаю данные как RDD, используя Flambo api.мы применяем zipmap имен столбцов и создаем хэш-карту / карту Clojure
, скажем, моя карта имеет следующие значения
[{:a 1 :b2}
{:a 2 :b 2}]
(:require [flambo.api :as f])
core.clj
Я использую
(f/map rdd-records (f/fn[each-rdd]
(perform-calcs each-red)))
в функции execute-calcs на основе входных данных с карты мы делаем дополнительные вычисления, что-то вроде
cals.clj
(defn perform-calcs
[r]
(merge r {:c (+ (:a r) (:b r))}))
у нас было новое требование:выполнить другой расчет на основе другого DataFrame из другого файла.мы не хотим загружать файл для каждой записи, поэтому сохранили код для загрузки DataFrame вне калькулятора и определили его в файле commons.этот DataFrame загружается как часть приложения и может быть доступен через приложение.
commons.clj
(def another-csv-df
(load-file->df "file-name"))
calcs.clj
(defn df-lookup
[r df]
{:d (->
df (.filter (format "a = %d and b = %d" (:a r) (:b r) )
(.select (into [] (map #(Column. %) ["d"] )))
(first)
(.getString(0))})
, включая это в выполнениеФункция -calcs изменится следующим образом.
(defn perform-calcs
[r]
(-> r
(merge {:c (+ (:a r) (:b r))})
(df-lookup commons/another-csv-df))
в действительности я вижу значения во фрейме данных ... код работает, как и ожидалось, без этого внешнего вызова DF с этим DF.работает долго ... и никогда не завершает процесс