Я использую Spark 2.4 для объединения двух потоков.Проблема в том, что результат пуст.
Я загружаю потоковые данные из папок:
data / 1
[
{"id1": 77,"name1": "aaa","timestamp": 1532609003},
{"id1": 77,"name1": "xxx","timestamp": 1532609005},
{"id1": 78,"name1": "xxx","timestamp": 1532609005}
]
data/ 2
[
{"id2": 77,"name2": "yyy", "timestamp2": 1532609000}
]
Мой код:
schema1 = StructType([
StructField("id1", IntegerType()),
StructField("name1", StringType()),
StructField("timestamp1", TimestampType()))
])
schema2 = StructType([
StructField("id2", IntegerType()),
StructField("name2", StringType()),
StructField("timestamp2", TimestampType()))
])
ds1 = spark \
.readStream \
.format("json") \
.schema(schema1) \
.load("data/1") \
.withWatermark("timestamp1", "2 minutes")
ds2 = spark \
.readStream \
.format("json") \
.schema(schema2) \
.load("data/2") \
.withWatermark("timestamp2", "2 minutes")
ds_joined = ds1.join(
ds2,
func.expr("""
id1 = id2 AND
timestamp1 >= timestamp2 AND
timestamp1 <= timestamp2 + interval 2 minutes
"""),
"leftOuter"
).fillna(0)
query = ds_joined \
.writeStream \
.format('console') \
.start()
query.awaitTermination()
Как видно, я использовал водяной знак 2 минуты.Поэтому я не понимаю, что я получаю пустой объединенный набор данных.
Ожидаемый результат:
id1 id2 name1 name2 timestamp1 timestamp2
77 77 aaa yyy 1532609003 1532609000
77 77 xxx yyy 1532609005 1532609000
78 0 xxx 0 1532609005 0