Потоковое объединение потоков без предиката равенства не поддерживается - PullRequest
0 голосов
/ 04 июля 2018

Я использую Spark 2.3 и пытаюсь объединить два потока данных. Мой левый и правый потоки имеют массив. Я хочу объединить два потока только тогда, когда правый массив потоков является подмножеством левого массива потоков.

Например, мой streamA выглядит так:

StreamA:
|---|------|---------------------|-----------|
|id | dept | employeesInMeetings | DateTime  |
|---|------|---------------------|-----------|
| 1 | sales| [John]              | 7/2 14:00 |
| 2 | mktg | [Adam, Mike]        | 7/2 12:30 |
| 3 | hr   | [Rick, Jill, Andy]  | 7/2 14:00 |
|---|------|---------------------|-----------|

и мой streamB выглядит следующим образом:

StreamB:
|--------------|--------------|----------|
|employees     | confRooms    | DateTime |
|--------------|--------------|----------|
| [John, Jane] |      A       | 7/2 14:00|
| [Adam, Mike] |      C       | 7/2 12:30| 
| [Jill, Andy] |      B       | 7/2 14:00|
|--------------|--------------|----------|

Я забочусь только о сотрудниках из того же отдела, которые находятся на одном собрании. Следовательно, в результате пересечения мой результирующий поток должен выглядеть следующим образом:

|---|------|---------------------|-----------|----------|
|id | dept | employeesInMeetings | DateTime  | confRoom |
|---|------|---------------------|-----------|----------|
| 2 | mktg | [Adam, Mike]        | 7/2 12:30 |    C     |
| 3 | hr   | [Rick, Jill, Andy]  | 7/2 14:00 |    B     |
|---|------|---------------------|-----------|----------|

Я создал UDF для пересечения:

val arrayIntersect = udf((leftArr: Array[String], rightArr: Array[String]) => {
  import spark.implicits._
  if(leftArr.intersect(rightArr.toSeq).length == rightArr.size){
    true
  } else {
    false
  }
})

И попытался использовать его следующим образом:

streamA.joinWith(streamB, expr("arrayIntersect(leftArr, rightArr) AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))

Однако я получаю сообщение об ошибке:

org.apache.spark.sql.AnalysisException: Stream stream joins without equality predicate is not supported;

Кто-нибудь знает, есть ли здесь обходной путь? Любая помощь будет оценена! Спасибо!

Ответы [ 2 ]

0 голосов
/ 06 июля 2018

К сожалению, в объединениях stream-stream для этого нет обходного пути: (

Нам действительно нужен предикат равенства, потому что мы используем его для выполнения объединения с использованием алгоритма потокового симметричного хеширования - оба потока разделяются с использованием общего ключа, так что связанные записи из обоих потоков попадают в один и тот же раздел.

0 голосов
/ 04 июля 2018

Сначала преобразуйте ваш массив в строку, а затем выполните поиск правой строки массива в левой строке массива.

val arrayToString = udf{arr: Seq[String] => arr.sorted.map(_.trim.toLowerCase).mkString(",")}

streamA.withColumn("leftArrStr", arrayToString(col("leftArr"))).joinWith(
  streamB.withColumn("rightArrStr", arrayToString(col("rightArr")))
  , expr("instr(leftArrStr, rightArrStr) != 0 " +
    "AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...