Spark 2.3.2 Шаблон поиска Scala - PullRequest
0 голосов
/ 08 ноября 2018

У меня есть только N (количество строк в шаблоне) и два набора данных с типами:

root
 |-- ts_begin: long (nullable = true)
 |-- btype: integer (nullable = false)
 |-- disp: double (nullable = true)
 |-- log_co: double (nullable = true)

Один из них слишком большой (данные истории), а другой имеет размер = N (обычно не более 10 строк).

Пример данных (истории):

+----------+-----+-------+-------+
|  ts_begin|btype|   disp| log_co|
+----------+-----+-------+-------+
|1535536647|    1|3.44E-4| 4.4E-4|
|1535536947|    1|1.16E-4| 4.0E-4|
|1535537250|   -1|1.03E-4|-2.0E-4|
|1535537550|    1|1.15E-4| 1.4E-4|
|1535537847|    1| 1.5E-4| 1.7E-4|
|1535538148|   -1|1.27E-4|-3.8E-4|
|1535538447|    1| 8.6E-5| 8.0E-5|
|1535538747|   -1|1.13E-4|-1.6E-4|
|1535539047|   -1| 6.6E-5|-4.0E-5|
|1535539347|   -1|1.75E-4|-2.5E-4|
|1535539649|   -1|1.76E-4|-5.2E-4|

Пример шаблона:

+----------+-----+-------+-------+
|  ts_begin|btype|   disp| log_co|
+----------+-----+-------+-------+
|1635536601|    1|3.44E-4| 4.4E-4|
|1635536902|    1|1.16E-4| 4.0E-4|
|1635537203|   -1|1.03E-4|-2.0E-4|
+----------+-----+-------+-------+

Я хочу выполнить поиск по шаблону с такими условиями:

  pattern.row.btype == hist.row.btype and
  pattern.row.disp between hist.row.disp*0.8 and hist.row.disp*1.2

для всех строк шаблона.

Оба набора данных отсортированы по ts_begin asc.

значения ts_begin не сравниваются.

В результате я хочу увидеть что-то вроде этого:

+----------+-----+-------+-------+--------+
|  ts_begin|btype|   disp| log_co|comp_res|
+----------+-----+-------+-------+--------+
|1535536647|    1|3.44E-4| 4.4E-4|       0|
|1535536947|    1|1.16E-4| 4.0E-4|       0|
|1535537250|   -1|1.03E-4|-2.0E-4|       0|
|1535537550|    1|1.15E-4| 1.4E-4|       1|
|1535537847|    1| 1.5E-4| 1.7E-4|       0|
|1535538148|   -1|1.27E-4|-3.8E-4|       0|
|1535538447|    1| 8.6E-5| 8.0E-5|       0|
|1535538747|   -1|1.13E-4|-1.6E-4|       1|
|1535539047|   -1| 6.6E-5|-4.0E-5|       0|
|1535539347|   -1|1.75E-4|-2.5E-4|       0|
|1535539649|   -1|1.76E-4|-5.2E-4|       0|

Немного больше объяснений, 1 в поле comp_res означает, что последовательность строк

    |1535537550|    1|1.15E-4| 1.4E-4|
    |1535537847|    1| 1.5E-4| 1.7E-4|
    |1535538148|   -1|1.27E-4|-3.8E-4|

"равно" шаблону.

Я пытался решить это с помощью

 val lb = listBars
    .withColumn("rn", row_number() over Window.orderBy(col("ts_begin").asc) )
    .withColumn("rn", floor(($"rn"-1)/nBuckets))
    .withColumn("rnk", row_number() over Window.partitionBy("rn").orderBy(col("ts_begin").asc) )
    .groupBy("rn")
    .pivot("rnk", 1 to nBuckets)
    .agg(
      sum("ts_begin").alias("ts_begin"),
      sum("btype").alias("btype"),
      sum("disp").alias("disp"),
      sum("log_co").alias("log_co")
    )

и затем UDF с 2 строками в качестве ввода, например ...

  def udf_comp(p: Row) = udf(
    (r: Row) =>
    {

Похоже, что существует более элегантное решение без поворота.

Я надеюсь, что можно использовать функцию Window с группировкой по всему набору данных, упорядочением по ts_begin asc и с кадром данных из текущей строки и некоторых следующих:

  val windowSpec = Window
    .orderBy(col("ts_begin").asc)
    .rowsBetween(Window.currentRow, nBuckets-1)

и с UDAF, которые получают и накапливают все строки из окна данных и шаблонов данных, а также в «оценке» сравнивают равноразмерные данные.

Также может быть решение с левым соединением шаблона.

Можете ли вы предложить мне что-нибудь? спасибо.

...