Spark Streaming: неструктурированные записи - PullRequest
0 голосов
/ 10 октября 2018

Мы потребляем данные из EventHub с использованием потоковой передачи.Входящий поток содержит JSON записей различных типов (около 400 различных типов)

Каждая запись будет классифицирована с использованием свойства ProductId .

пример (Входящиепоток записей):

record1 - { productId: 101, colA: "some-val", colB: "some-val" }
record2 - { productId: 104, colC: "some-val", colD: "some-val" }
record3 - { productId: 202, colA: "some-val", colD: "some-val", colF: "some-val" }
record3 - { productId: 342, colH: "some-val", colJ: "some-val", colK: "some-val" }

Количество свойств в каждой записи варьируется, но запись, имеющая аналогичный productId , будет иметь точно такое же количество свойств.

ProductId варьируется от(1 - 400), количество свойств в записи будет до 50.

Я хочу прочитать вышеупомянутый поток записи JSON и записать в различные паркетные / дельта-местоположения подобно

    Location(Delta/Parquet)             Records
    -----------------------------------------------------------------
    /mnt/product-101        Contains all records with productId - 101
    /mnt/product-104        Contains all records with productId - 104
    /mnt/product-202        Contains all records with productId - 202
    /mnt/product-342        Contains all records with productId - 342

1) Как создать DataFrame / Dataset из потока, содержащего записи разных типов?

2) Будет ли возможно использование одного искрового потока и запись в другое местоположение дельты / паркета?

1 Ответ

0 голосов
/ 14 октября 2018

Обратите внимание, что использование этого метода должно работать, однако он будет генерировать много разреженных данных.

Сначала создайте StructType со всеми объединенными столбцами.

  val schema = new StructType().add("productId", LongType)
  .add("colA", StringType).add("colB", StringType)
  .add("colC", StringType)....

Затем создайте поток, используяэта схема и функция from_json.

val df1 = df.select(from_json('value, schema).alias("tmp"))
.select("tmp.*")

Наконец, используйте partitionBy для записи разделенных файлов паркета.

val query1 = df1 
.writeStream 
.format("parquet") 
.option("path", "/mnt/product/") 
.option("checkpointLocation","/tmp/checkpoint")
.partitionBy("productId").start()

Это создаст строки со всеми столбцами.Столбцы, которых изначально не было на входе json, будут помечены как нулевые.Паркет поддерживает запись нулей.Но было бы лучше, если бы вы сначала отфильтровали их.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...