Кафка читать MyClacc - PullRequest
       21

Кафка читать MyClacc

0 голосов
/ 14 апреля 2020

Я не могу записать данные в файл паркета с кафки. Я пытаюсь преобразовать данные из кафки в набор данных (r1). Схема возвращается правильно, но все данные являются нулевыми. Я не могу понять, где ошибка. Пример данных: {"время": 100000000000, "количество": 3000, "orderNo": 16545, "realOrderNo": 19022241346, "clientcodeid": 9411494, "entryTime": 95003618770, "цена": 0.4904, "фирма": "0000500000", "boardid": "00000F00", "seccode": "00000F00", "trdaccid": "00000F00", "BuySell": "B", "маркетмейкер" ложь "initialQuantity": 3000, "tradedate ":" 17.12.2019 "} Не могу понять, почему данные не возвращаются. Помоги мне, пожалуйста.

    StructType schema = new StructType()
            .add("time","int")
            .add("quantity","long")
            .add("orderNo","long")
            .add("realOrderNo","long")
            .add("clientcodeid","long")
            .add("entryTime","long")
            .add("price","long")
            .add("firm","String")
            .add("boardid","String")
            .add("seccode","String")
            .add("trdaccid","String")
            .add("buysell","String")
            .add("marketMaker","boolean")
            .add("initialQuantity","long")
            .add("tradedate","String")
            ;

    r1 =  sqlContext
            .read()
            .format("kafka")
            .option("kafka.bootstrap.servers", IKafkaConstants.KAFKA_BROKERS)
            .option("subscribe", IKafkaConstants.TOPIC_NAME)
            .option("header", "true")
            .option("checkpointLocation", "/tmp/checkpoint/1")
            .load()
            .selectExpr("CAST(value AS STRING)")
            .select(functions.from_json(col("value"), schema).as("data"))

    ;


    System.out.println("r1");
    r1.printSchema();
    r1.show();

Результат:

root
 |-- data: struct (nullable = true)
 |    |-- time: integer (nullable = true)
 |    |-- quantity: long (nullable = true)
 |    |-- orderNo: long (nullable = true)
 |    |-- realOrderNo: long (nullable = true)
 |    |-- clientcodeid: long (nullable = true)
 |    |-- entryTime: long (nullable = true)
 |    |-- price: long (nullable = true)
 |    |-- firm: string (nullable = true)
 |    |-- boardid: string (nullable = true)
 |    |-- seccode: string (nullable = true)
 |    |-- trdaccid: string (nullable = true)
 |    |-- buysell: string (nullable = true)
 |    |-- marketMaker: boolean (nullable = true)
 |    |-- initialQuantity: long (nullable = true)
 |    |-- tradedate: string (nullable = true)

+----+
|data|
+----+
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
+----+

1 Ответ

0 голосов
/ 14 апреля 2020

В схеме два типа полей не соответствуют значениям JSON, должны использоваться следующие типы:

.add("time","long")
.add("price","double")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...