Я не могу записать данные в файл паркета с кафки. Я пытаюсь преобразовать данные из кафки в набор данных (r1). Схема возвращается правильно, но все данные являются нулевыми. Я не могу понять, где ошибка. Пример данных: {"время": 100000000000, "количество": 3000, "orderNo": 16545, "realOrderNo": 19022241346, "clientcodeid": 9411494, "entryTime": 95003618770, "цена": 0.4904, "фирма": "0000500000", "boardid": "00000F00", "seccode": "00000F00", "trdaccid": "00000F00", "BuySell": "B", "маркетмейкер" ложь "initialQuantity": 3000, "tradedate ":" 17.12.2019 "} Не могу понять, почему данные не возвращаются. Помоги мне, пожалуйста.
StructType schema = new StructType()
.add("time","int")
.add("quantity","long")
.add("orderNo","long")
.add("realOrderNo","long")
.add("clientcodeid","long")
.add("entryTime","long")
.add("price","long")
.add("firm","String")
.add("boardid","String")
.add("seccode","String")
.add("trdaccid","String")
.add("buysell","String")
.add("marketMaker","boolean")
.add("initialQuantity","long")
.add("tradedate","String")
;
r1 = sqlContext
.read()
.format("kafka")
.option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
.option("subscribe", IKafkaConstants.TOPIC_NAME)
.option("header", "true")
.option("checkpointLocation", "/tmp/checkpoint/1")
.load()
.selectExpr("CAST(value AS STRING)")
.select(functions.from_json(col("value"), schema).as("data"))
;
System.out.println("r1");
r1.printSchema();
r1.show();
Результат:
root
|-- data: struct (nullable = true)
| |-- time: integer (nullable = true)
| |-- quantity: long (nullable = true)
| |-- orderNo: long (nullable = true)
| |-- realOrderNo: long (nullable = true)
| |-- clientcodeid: long (nullable = true)
| |-- entryTime: long (nullable = true)
| |-- price: long (nullable = true)
| |-- firm: string (nullable = true)
| |-- boardid: string (nullable = true)
| |-- seccode: string (nullable = true)
| |-- trdaccid: string (nullable = true)
| |-- buysell: string (nullable = true)
| |-- marketMaker: boolean (nullable = true)
| |-- initialQuantity: long (nullable = true)
| |-- tradedate: string (nullable = true)
+----+
|data|
+----+
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
+----+