Почему результат остается пустым после слияния двух потоков в Spark 2.4? - PullRequest
0 голосов
/ 25 ноября 2018

Я использую Spark 2.4 для объединения двух потоков.Проблема в том, что результат пуст.

Я загружаю потоковые данные из папок:

data / 1

[
  {"id1": 77,"name1": "aaa","timestamp": 1532609003},
  {"id1": 77,"name1": "xxx","timestamp": 1532609005},
  {"id1": 78,"name1": "xxx","timestamp": 1532609005}
]

data/ 2

[
  {"id2": 77,"name2": "yyy", "timestamp2": 1532609000}
]

Мой код:

schema1 = StructType([
    StructField("id1", IntegerType()),
    StructField("name1", StringType()),
    StructField("timestamp1", TimestampType()))
])

schema2 = StructType([
    StructField("id2", IntegerType()),
    StructField("name2", StringType()),
    StructField("timestamp2", TimestampType()))
])

ds1 = spark \
    .readStream \
    .format("json") \
    .schema(schema1) \
    .load("data/1") \
    .withWatermark("timestamp1", "2 minutes")

ds2 = spark \
    .readStream \
    .format("json") \
    .schema(schema2) \
    .load("data/2") \
    .withWatermark("timestamp2", "2 minutes")

ds_joined = ds1.join(
    ds2,
    func.expr("""
    id1 = id2 AND
    timestamp1 >= timestamp2 AND
    timestamp1 <= timestamp2 + interval 2 minutes
    """),
    "leftOuter"
).fillna(0)

query = ds_joined \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

Как видно, я использовал водяной знак 2 минуты.Поэтому я не понимаю, что я получаю пустой объединенный набор данных.

Ожидаемый результат:

id1  id2  name1  name2  timestamp1  timestamp2
77   77   aaa    yyy    1532609003  1532609000
77   77   xxx    yyy    1532609005  1532609000
78   0    xxx    0      1532609005  0     
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...