Дата присоединения и ранга в течение периода времени - PullRequest
0 голосов
/ 08 июля 2019

Я пытаюсь получить звание и присоединиться в течение 10 дней, используя либо Scala Spark, либо SQL.

У меня есть таблица, в которой содержатся попытки пользователей, и еще одна, в которой есть контракты. У них есть идентификаторы, по которым я могу присоединиться к ним, но помимо этого идентификатора мне нужно учитывать определенные временные рамки. Для упрощения примера скажем, что все мои записи имеют одинаковый идентификатор:

Tries:
try, try_day
Try 1, 2018-08-01 

Try 2, 2018-09-01
Try 3, 2018-10-01
Try 4, 2018-10-02

Contracts:
contract, contract_day
Contract 1, 2018-08-01
Contract 2, 2018-09-02
Contract 3, 2018-10-01

Я хочу присоединиться к ним только в том случае, если 1) разница между попытками превышает 10 дней и 2) максимальная разница в 2 дня между датами двух таблиц. В итоге я получаю:

try, try_day, contract, contract_day, (explanation)

Try 1, 2018-08-01, Contract 1, 2018-08-01 , (same date and more than 10 days between try 1 and try 2)

Try 2, 2018-09-01, Contract 2, 2018-09-02, (difference of less than 2 days, and more than 10 days between try 2 and try 3)
Try 3, 2018-10-01, null, null (there is less than 10 days difference between try 3 and try 4 so contract should match with try 4 only)
Try 4, 2018-10-02, Contract 3, 2018-10-01

Я подумал, что, возможно, захочу ранжировать даты попыток, а затем выполнить объединение только на том месте, которое занимает первое место. Но тогда мне нужно оценивать только в 10-дневном окне.

SELECT *, dense_rank() OVER (PARTITION BY id ORDER BY try_day DESC) as rank
FROM tries

К сожалению, это оценивает их всех от 1 до 4, но я хочу получить звание

try, try_day, rank
Try 1, 2018-08-01, 1 

Try 2, 2018-09-01, 1
Try 3, 2018-10-01, 2
Try 4, 2018-10-02, 1

, а затем присоединиться, где рейтинг равен 1, а данные в течение 2 дней.

ЕСЛИ у кого-то есть более логичное представление о том, как этого добиться, то это тоже приветствуется. Спасибо

1 Ответ

1 голос
/ 08 июля 2019

Вот один из подходов, использующих unix_timestamp и оконную функцию lead для вычисления rank на основе условия re: try_day s между последовательными строками и left-join -ing двух DataFrames для условия re: try_day и contract_day:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import spark.implicits._

val dfTries = Seq(
  ("Try 1", "2018-08-01"),
  ("Try 2", "2018-09-01"),
  ("Try 3", "2018-10-01"),
  ("Try 4", "2018-10-02")
).toDF("try", "try_day")

val dfContracts = Seq(
  ("contract 1", "2018-08-01"),
  ("contract 2", "2018-09-02"),
  ("contract 3", "2018-10-01"),
).toDF("contract", "contract_day")

dfTries.
  withColumn("try_ts", unix_timestamp($"try_day", "yyyy-MM-dd")).
  withColumn("prev_try_ts", lead($"try_ts", 1).over(Window.orderBy($"try"))).
  withColumn("rank", when(
      $"prev_try_ts".isNull || abs($"try_ts" - $"prev_try_ts") > 10 * 24 * 3600,
      1
    ).otherwise(2)
  ).
  join(
    dfContracts,
    $"rank" === 1 && abs($"try_ts" - unix_timestamp($"contract_day", "yyyy-MM-dd")) <= 2 * 24 * 3600,
    "left_outer").
  show
// +-----+----------+----------+-----------+----+----------+------------+
// |  try|   try_day|    try_ts|prev_try_ts|rank|  contract|contract_day|
// +-----+----------+----------+-----------+----+----------+------------+
// |Try 1|2018-08-01|1533106800| 1535785200|   1|contract 1|  2018-08-01|
// |Try 2|2018-09-01|1535785200| 1538377200|   1|contract 2|  2018-09-02|
// |Try 3|2018-10-01|1538377200| 1538463600|   2|      null|        null|
// |Try 4|2018-10-02|1538463600|       null|   1|contract 3|  2018-10-01|
// +-----+----------+----------+-----------+----+----------+------------+

Стоит отметить, что использование функции Window без partitionBy будет плохо масштабироваться.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...