Как реализовать меры «Последний не пустой» с помощью Apache Spark и Sparkube? - PullRequest
0 голосов
/ 11 мая 2018

Мы используем комбинацию Apache Spark и Sparkube для создания оперативных аналитических сред. Данные подготовлены в Spark и представлены в виде многомерного куба со Sparkube. Sparkube автоматически публикует простые агрегации (SUM, MIN, MAX, AVG, STD ...), но как мы можем поддерживать агрегацию «последний непустой»?

Возьмем, к примеру, этот набор данных, где периодически регистрируется количество запасов различных продуктов. Запас на 2018 год должен быть не суммой записей запасов за этот год, а самой последней за этот год.

Time,Product,Stock
2017-11-01, Oranges, 40000
2017-11-01, Apples, 120000
2017-12-01, Oranges, 42000
2017-12-01, Apples, 110000
2018-01-01, Oranges, 50000
2018-01-01, Apples, 100000
2018-02-01, Oranges, 48000
2018-02-01, Apples, 130000
2018-03-01, Oranges, 46000
2018-03-01, Apples, 120000

1 Ответ

0 голосов
/ 11 мая 2018

Проблема заключается в том, что записи количества товара (или позиции в целом) не агрегируют с течением времени. Но изменения количества делают. Поэтому вы можете использовать Spark для вычисления изменений количества запаса, которые будут последовательно суммироваться в кубе, а затем из внешнего интерфейса OLAP создать вычисленную меру, которая будет (повторно) вычислять фактический запас для любого местоположения в кубе.

Загрузите набор данных в Spark и рассчитайте изменение запаса для продукта

import org.apache.spark.sql.expressions.Window

var ds = spark.read
.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("/path/to/stock.csv")

val ws = Window.partitionBy("Product").orderBy("Time")

val ds2 = ds
  .withColumn("PreviousStockTmp", lag(col("Stock"), 1).over(ws))
  .withColumn("PreviousStock", when($"PreviousStockTmp".isNull, 0).otherwise($"PreviousStockTmp"))
  .drop("PreviousStockTmp")
  .withColumn("StockVariation", col("Stock").minus(col("PreviousStock"))).orderBy("Time")

Добавьте обычные столбцы «год», «месяц», «день» и т. Д., Которые будут полезны для онлайн-анализа

val ds3 = ds2
  .withColumn("Year", year(col("Time")))
  .withColumn("Month", month(col("Time")))
  .withColumn("Day", dayofmonth(col("Time")))

Наконец, опубликуйте набор данных в виде куба со Sparkube

import com.activeviam.sparkube._
new Sparkube().fromDataset(ds3).expose()

Теперь вы можете просматривать куб в Excel, Tableau или ActiveUI, вы можете использовать меру «StockVariation.SUM» в диаграммах и сводных таблицах. В MDX вы можете создать вычисляемый показатель для расчета запаса из вариантов:

WITH
 Member [Measures].[Stock] AS (
  (
    [Measures].[Stock],
    [Year].CurrentMember.PrevMember
  ) + [Measures].[StockVariation.SUM]
) 
SELECT
  NON EMPTY Crossjoin(
    [Year].[Year].[Year].Members,
    {
      [Measures].[Stock]
    }
  ) ON COLUMNS,
  NON EMPTY [Product].[Product].[Product].Members ON ROWS
  FROM [_sparkube_1]

enter image description here

...