У меня есть только N (количество строк в шаблоне) и два набора данных с типами:
root
|-- ts_begin: long (nullable = true)
|-- btype: integer (nullable = false)
|-- disp: double (nullable = true)
|-- log_co: double (nullable = true)
Один из них слишком большой (данные истории), а другой имеет размер = N (обычно не более 10 строк).
Пример данных (истории):
+----------+-----+-------+-------+
| ts_begin|btype| disp| log_co|
+----------+-----+-------+-------+
|1535536647| 1|3.44E-4| 4.4E-4|
|1535536947| 1|1.16E-4| 4.0E-4|
|1535537250| -1|1.03E-4|-2.0E-4|
|1535537550| 1|1.15E-4| 1.4E-4|
|1535537847| 1| 1.5E-4| 1.7E-4|
|1535538148| -1|1.27E-4|-3.8E-4|
|1535538447| 1| 8.6E-5| 8.0E-5|
|1535538747| -1|1.13E-4|-1.6E-4|
|1535539047| -1| 6.6E-5|-4.0E-5|
|1535539347| -1|1.75E-4|-2.5E-4|
|1535539649| -1|1.76E-4|-5.2E-4|
Пример шаблона:
+----------+-----+-------+-------+
| ts_begin|btype| disp| log_co|
+----------+-----+-------+-------+
|1635536601| 1|3.44E-4| 4.4E-4|
|1635536902| 1|1.16E-4| 4.0E-4|
|1635537203| -1|1.03E-4|-2.0E-4|
+----------+-----+-------+-------+
Я хочу выполнить поиск по шаблону с такими условиями:
pattern.row.btype == hist.row.btype and
pattern.row.disp between hist.row.disp*0.8 and hist.row.disp*1.2
для всех строк шаблона.
Оба набора данных отсортированы по ts_begin asc.
значения ts_begin не сравниваются.
В результате я хочу увидеть что-то вроде этого:
+----------+-----+-------+-------+--------+
| ts_begin|btype| disp| log_co|comp_res|
+----------+-----+-------+-------+--------+
|1535536647| 1|3.44E-4| 4.4E-4| 0|
|1535536947| 1|1.16E-4| 4.0E-4| 0|
|1535537250| -1|1.03E-4|-2.0E-4| 0|
|1535537550| 1|1.15E-4| 1.4E-4| 1|
|1535537847| 1| 1.5E-4| 1.7E-4| 0|
|1535538148| -1|1.27E-4|-3.8E-4| 0|
|1535538447| 1| 8.6E-5| 8.0E-5| 0|
|1535538747| -1|1.13E-4|-1.6E-4| 1|
|1535539047| -1| 6.6E-5|-4.0E-5| 0|
|1535539347| -1|1.75E-4|-2.5E-4| 0|
|1535539649| -1|1.76E-4|-5.2E-4| 0|
Немного больше объяснений, 1 в поле comp_res означает, что последовательность строк
|1535537550| 1|1.15E-4| 1.4E-4|
|1535537847| 1| 1.5E-4| 1.7E-4|
|1535538148| -1|1.27E-4|-3.8E-4|
"равно" шаблону.
Я пытался решить это с помощью
val lb = listBars
.withColumn("rn", row_number() over Window.orderBy(col("ts_begin").asc) )
.withColumn("rn", floor(($"rn"-1)/nBuckets))
.withColumn("rnk", row_number() over Window.partitionBy("rn").orderBy(col("ts_begin").asc) )
.groupBy("rn")
.pivot("rnk", 1 to nBuckets)
.agg(
sum("ts_begin").alias("ts_begin"),
sum("btype").alias("btype"),
sum("disp").alias("disp"),
sum("log_co").alias("log_co")
)
и затем UDF с 2 строками в качестве ввода, например ...
def udf_comp(p: Row) = udf(
(r: Row) =>
{
Похоже, что существует более элегантное решение без поворота.
Я надеюсь, что можно использовать функцию Window с группировкой по всему набору данных, упорядочением по ts_begin asc и с кадром данных из текущей строки и некоторых следующих:
val windowSpec = Window
.orderBy(col("ts_begin").asc)
.rowsBetween(Window.currentRow, nBuckets-1)
и с UDAF, которые получают и накапливают все строки из окна данных и шаблонов данных, а также в «оценке» сравнивают равноразмерные данные.
Также может быть решение с левым соединением шаблона.
Можете ли вы предложить мне что-нибудь? спасибо.