Я использую Spark 2.3 и пытаюсь объединить два потока данных. Мой левый и правый потоки имеют массив. Я хочу объединить два потока только тогда, когда правый массив потоков является подмножеством левого массива потоков.
Например, мой streamA выглядит так:
StreamA:
|---|------|---------------------|-----------|
|id | dept | employeesInMeetings | DateTime |
|---|------|---------------------|-----------|
| 1 | sales| [John] | 7/2 14:00 |
| 2 | mktg | [Adam, Mike] | 7/2 12:30 |
| 3 | hr | [Rick, Jill, Andy] | 7/2 14:00 |
|---|------|---------------------|-----------|
и мой streamB выглядит следующим образом:
StreamB:
|--------------|--------------|----------|
|employees | confRooms | DateTime |
|--------------|--------------|----------|
| [John, Jane] | A | 7/2 14:00|
| [Adam, Mike] | C | 7/2 12:30|
| [Jill, Andy] | B | 7/2 14:00|
|--------------|--------------|----------|
Я забочусь только о сотрудниках из того же отдела, которые находятся на одном собрании. Следовательно, в результате пересечения мой результирующий поток должен выглядеть следующим образом:
|---|------|---------------------|-----------|----------|
|id | dept | employeesInMeetings | DateTime | confRoom |
|---|------|---------------------|-----------|----------|
| 2 | mktg | [Adam, Mike] | 7/2 12:30 | C |
| 3 | hr | [Rick, Jill, Andy] | 7/2 14:00 | B |
|---|------|---------------------|-----------|----------|
Я создал UDF для пересечения:
val arrayIntersect = udf((leftArr: Array[String], rightArr: Array[String]) => {
import spark.implicits._
if(leftArr.intersect(rightArr.toSeq).length == rightArr.size){
true
} else {
false
}
})
И попытался использовать его следующим образом:
streamA.joinWith(streamB, expr("arrayIntersect(leftArr, rightArr) AND streamA.DateTime BETWEEN streamB.DateTime and streamB.DateTime + INTERVAL 12 hours"))
Однако я получаю сообщение об ошибке:
org.apache.spark.sql.AnalysisException: Stream stream joins without equality predicate is not supported;
Кто-нибудь знает, есть ли здесь обходной путь? Любая помощь будет оценена! Спасибо!