Как преобразовать массив JSON в строки перед записью в Elasticsearch? - PullRequest
0 голосов
/ 23 ноября 2018

Продолжение до этого вопроса

У меня есть потоковые данные JSON в формате, аналогичном приведенному ниже

|  A    | B                                        |
|-------|------------------------------------------|
|  ABC  |  [{C:1, D:1}, {C:2, D:4}]                | 
|  XYZ  |  [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |

Мне нужно преобразовать его в формат ниже

|   A   |  C  |  D   |
|-------|-----|------|
|  ABC  |  1  |  1   |
|  ABC  |  2  |  4   |
|  XYZ  |  3  |  6   |
|  XYZ  |  9  |  11  |
|  XYZ  |  5  |  12  | 

Чтобы добиться этого, выполнили преобразования, как предлагалось в предыдущем вопросе.

val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")

val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum") 

val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")

val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))

val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C")) 

val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")

Теперь мне нужно сохранить данные в ElasticSearch.

 df6.writeStream
  .outputMode("complete")
  .format("es")
  .option("es.resource", "index/type")
  .option("es.nodes", "localhost")
  .option("es.port", 9200)
  .start()
  .awaitTermination()

Я получаю сообщение об ошибке, что ElasticSearch не поддерживает режим вывода Append.В режиме Append происходит сбой записи в writeStream, при этом агрегация в режиме Append невозможна.Я был в состоянии написать на консоль в завершенном режиме.Как я могу записать данные в ElasticSearch сейчас

Любая помощь будет оценена.

1 Ответ

0 голосов
/ 23 ноября 2018

Нет необходимости в pivot или агрегировании.Если столбец B действительно равен Array[Map[String, String]] (array<map<string, string>> в типах SQL), все, что вам нужно, это простой select или withColumn:

df
  .withColumn("B", explode($"B"))
  .select($"A", $"B"("C") as "C", $"B"("D") as "D")
...