Схема искры JSON вложенная - PullRequest
0 голосов
/ 07 марта 2020

Я пытаюсь работать с json данными из веб-сокета binance.

Сейчас у меня есть схема, похожая на эту:

val schema = new StructType()
  .add("e",StringType)
  .add("E",StringType)
  .add("s",StringType)
  .add("k",StringType)
    .add("t",IntegerType)
    .add("T",IntegerType)
    .add("s",StringType)
    .add("i",StringType)
    .add("f",StringType)
    .add("L",StringType)
    .add("o",DoubleType)
    .add("c",DoubleType)
    .add("h",DoubleType)
    .add("l",DoubleType)
    .add("v",DoubleType)
    .add("n",IntegerType)
    .add("x",StringType)
    .add("q",DoubleType)
    .add("V",DoubleType)
    .add("Q",DoubleType)
    .add("B",StringType)

И я получаю это сообщение от моего kafka topi c:

{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}

Как вы можете видеть, сообщение вложено под клавишей "k".

Мой вывод a в искре в данный момент выглядит следующим образом:

 root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)


-------------------------------------------

https://imgur.com/a/9LPu9z6 

Изображение фрейма данных, так как я не смог вставить его в форум без разрушения фрейма.

1 Ответ

0 голосов
/ 07 марта 2020

В вашей схеме вам нужно иметь "k" как StructType()

  • Заменить DoubleType на StringType как двойные данные заключены в "" в ваших данных выборки.

Example:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

//schema for json
val schema = new StructType().
add("e",StringType).
add("E",StringType).
add("s",StringType).
add("k",new StructType().
    add("t",LongType).
    add("T",LongType).
    add("s",StringType).
    add("i",StringType).
    add("f",StringType).
    add("L",StringType).
    add("o",StringType).
    add("c",StringType).
    add("h",StringType).
    add("l",StringType).
    add("v",StringType).
    add("n",IntegerType).
    add("x",BooleanType).
    add("q",StringType).
    add("V",StringType).
    add("Q",StringType).
    add("B",StringType)
    )

//sample data
val jsn=Seq("""{"e":"kline","E":1583595170076,"s":"BTCUSDT","k":{"t":1583595120000,"T":1583595179999,"s":"BTCUSDT","i":"1m","f":47069029,"L":47069101,"o":"9111.22","c":"9114.90","h":"9114.91","l":"9109.65","v":"30.297","n":73,"x":false,"q":"276055.09390","V":"11.517","Q":"104946.56519","B":"0"}}""")


spark.read.schema(schema).json(jsn.toDS).select("*","k.*").drop("k").show()

//+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
//|    e|            E|      s|            t|            T|      s|  i|       f|       L|      o|      c|      h|      l|     v|  n|    x|           q|     V|           Q|  B|
//+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+
//|kline|1583595170076|BTCUSDT|1583595120000|1583595179999|BTCUSDT| 1m|47069029|47069101|9111.22|9114.90|9114.91|9109.65|30.297| 73|false|276055.09390|11.517|104946.56519|  0|
//+-----+-------------+-------+-------------+-------------+-------+---+--------+--------+-------+-------+-------+-------+------+---+-----+------------+------+------------+---+

Затем вы можете разыграть все обязательные поля для float..etc типов с использованием фрейма данных Column casting.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...