Не удается прочитать файлы JSON: структурированная потоковая передача Spark с использованием Java - PullRequest
1 голос
/ 04 мая 2019

У меня есть скрипт на python, который каждую минуту получает данные о запасах (как показано ниже) из NYSE в новом файле (одной строкой). Содержит данные о 4 акциях - MSFT, ADBE, GOOGL и FB, в формате json ниже

[{"symbol": "MSFT", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "126.0800", "high": "126.1000", "low": "126.0500", "close": "126.0750", "volume": "57081"}}, {"symbol": "ADBE", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "279.2900", "high": "279.3400", "low": "279.2600", "close": "279.3050", "volume": "12711"}}, {"symbol": "GOOGL", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "1166.4100", "high": "1166.7400", "low": "1166.2900", "close": "1166.7400", "volume": "8803"}}, {"symbol": "FB", "timestamp": "2019-05-02 15:59:00", "priceData": {"open": "192.4200", "high": "192.5000", "low": "192.3600", "close": "192.4800", "volume": "33490"}}]

Я пытаюсь прочитать этот поток файлов в кадре данных Spark Streaming. Но я не могу определить правильную схему для него. Заглянул в инет и пока проделал следующее

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;



public class Driver1 {

    public static void main(String args[]) throws InterruptedException, StreamingQueryException {


        SparkSession session = SparkSession.builder().appName("Spark_Streaming").master("local[2]").getOrCreate();
        Logger.getLogger("org").setLevel(Level.ERROR);


        StructType priceData = new StructType()
                .add("open", DataTypes.DoubleType)
                .add("high", DataTypes.DoubleType)
                .add("low", DataTypes.DoubleType)
                .add("close", DataTypes.DoubleType)
                .add("volume", DataTypes.LongType);

        StructType schema = new StructType()
                .add("symbol", DataTypes.StringType)
                .add("timestamp", DataTypes.StringType)
                .add("stock", priceData);


        Dataset<Row> rawData = session.readStream().format("json").schema(schema).json("/home/abhinavrawat/streamingData/data/*");
        rawData.printSchema();
        rawData.writeStream().format("console").start().awaitTermination();
        session.close();        

    }

}

Вывод, который я получаю, это-

root
 |-- symbol: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- stock: struct (nullable = true)
 |    |-- open: double (nullable = true)
 |    |-- high: double (nullable = true)
 |    |-- low: double (nullable = true)
 |    |-- close: double (nullable = true)
 |    |-- volume: long (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-------------------+-----+
|symbol|          timestamp|stock|
+------+-------------------+-----+
|  MSFT|2019-05-02 15:59:00| null|
|  ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
|    FB|2019-05-02 15:59:00| null|
|  MSFT|2019-05-02 15:59:00| null|
|  ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
|    FB|2019-05-02 15:59:00| null|
|  MSFT|2019-05-02 15:59:00| null|
|  ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
|    FB|2019-05-02 15:59:00| null|
|  MSFT|2019-05-02 15:59:00| null|
|  ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
|    FB|2019-05-02 15:59:00| null|
|  MSFT|2019-05-02 15:59:00| null|
|  ADBE|2019-05-02 15:59:00| null|
| GOOGL|2019-05-02 15:59:00| null|
|    FB|2019-05-02 15:59:00| null|
+------+-------------------+-----+

Я даже пытался сначала прочитать строку json в виде текстового файла, а затем применить схему (как это делается с Kafka-Streaming) ...

  Dataset<Row> rawData = session.readStream().format("text").load("/home/abhinavrawat/streamingData/data/*");
    Dataset<Row> raw2 = rawData.select(org.apache.spark.sql.functions.from_json(rawData.col("value"),schema)); 
raw2.writeStream().format("console").start().awaitTermination();

Получение ниже вывода, в этом случае, rawData фрейм данных в качестве данных json в строке fromat,

+--------------------+
|jsontostructs(value)|
+--------------------+
|                null|
|                null|
|                null|
|                null|
|                null|

Пожалуйста, помогите мне разобраться.

1 Ответ

1 голос
/ 04 мая 2019

Просто разобрался, имейте в виду следующие две вещи-

  1. При определении схемы убедитесь, что имя и порядок поля точно такие же, как в вашем файле json.

  2. Первоначально, используйте только StringType для всех своих полей, вы можете применить преобразование, чтобы изменить его обратно на какой-то определенный тип данных.

Вот что у меня сработало-

    StructType priceData = new StructType()
            .add("open", DataTypes.StringType)
            .add("high", DataTypes.StringType)
            .add("low", DataTypes.StringType)
            .add("close", DataTypes.StringType)
            .add("volume", DataTypes.StringType);

    StructType schema = new StructType()
            .add("symbol", DataTypes.StringType)
            .add("timestamp", DataTypes.StringType)
            .add("priceData", priceData);


    Dataset<Row> rawData = session.readStream().format("json").schema(schema).json("/home/abhinavrawat/streamingData/data/*");
    rawData.writeStream().format("console").start().awaitTermination();
    session.close();

См. Вывод-

+------+-------------------+--------------------+
|symbol|          timestamp|           priceData|
+------+-------------------+--------------------+
|  MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
|  ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
|    FB|2019-05-02 15:59:00|[192.4200, 192.50...|
|  MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
|  ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
|    FB|2019-05-02 15:59:00|[192.4200, 192.50...|
|  MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
|  ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
|    FB|2019-05-02 15:59:00|[192.4200, 192.50...|
|  MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
|  ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
|    FB|2019-05-02 15:59:00|[192.4200, 192.50...|
|  MSFT|2019-05-02 15:59:00|[126.0800, 126.10...|
|  ADBE|2019-05-02 15:59:00|[279.2900, 279.34...|
| GOOGL|2019-05-02 15:59:00|[1166.4100, 1166....|
|    FB|2019-05-02 15:59:00|[192.4200, 192.50...|
+------+-------------------+--------------------+

Теперь вы можете сгладить столбец priceData, используя priceData.open, priceData.close и т. Д.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...