Как объединить два структурированных потока Spark? - PullRequest
0 голосов
/ 23 ноября 2018

Возможно ли объединить два структурированных потока Spark в Spark 2.2.1?Я обнаружил много проблем с выполнением очень простых манипуляций в Spark Structured Streaming.Документация и количество примеров кажутся мне очень ограниченными.У меня есть два источника потоковых данных:

Person.json:

[
  {"building_id": 70, "id": 21, "latitude": 41.20, "longitude": 2.2, "timestamp": 1532609003},
  {"building_id": 70, "id": 15, "latitude": 41.24, "longitude": 2.3, "timestamp": 1532609005},
  {"building_id": 71, "id": 11, "latitude": 41.28, "longitude": 2.1, "timestamp": 1532609005}
]

machines.json

[
  {"building_id": 70, "mid": 222, "latitude": 42.1, "longitude": 2.11}
]

Цель состоит в том, чтобы получить объединенный DataFrame с широтой идолгота людей и машин.Мне это нужно для того, чтобы оценить расстояние между ними в режиме реального времени:

building_id   id   mid   latitude  longitude  latitude_machine  longitude_machine
70            21   222   41.20     2.2        42.1              2.11
# ...

Если невозможно объединить два потока, я был бы очень признателен за рекомендацию возможного обходного пути.

Код:

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[2]") \
    .getOrCreate()

schema_persons = StructType([
    StructField("building_id", IntegerType()),
    StructField("id", IntegerType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType()),
    StructField("timestamp", LongType())
])

schema_machines = StructType([
    StructField("building_id", IntegerType()),
    StructField("mid", IntegerType()),
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType())
])

df_persons = spark \
    .readStream \
    .format("json") \
    .schema(schema_persons) \
    .load("data/persons")

df_machines = spark \
    .readStream \
    .format("json") \
    .schema(schema_machines) \
    .load("data/machines") \
    .withColumnRenamed("latitude", "latitude_machine") \
    .withColumnRenamed("longitude", "longitude_machine")

df_joined = df_persons\
              .join(df_machines, ["building_id"], "left")

query_persons = df_persons \
    .writeStream \
    .format('console') \
    .start()

query_machines = df_machines \
    .writeStream \
    .format('console') \
    .start()

query_persons.awaitTermination()
query_machines.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...