Неправильный выход при структурированной искровой струе - PullRequest
0 голосов
/ 09 апреля 2020

Я пишу json отправитель в сокет и другую искровую программу. Я посылаю в свой сокет json вот так

{"id":66,"firstName":"rifedander@hotmail.co.uk","lastName":
  "sithprays@live.com","email":"crankfrock@hotmail.co.uk"}

В Spark Stream у меня есть строка json, я разделяю ее, как моя схема StructType, и когда я пытаюсь показать вывод в консоли, у меня есть только пустой результат, но я знаю, что данные отправляются правильно, а не ноль. Мой вывод

20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.0.b177b949-0149-45e9-91c3-7a1c08f41e50.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/0
-------------------------------------------
Batch: 0
-------------------------------------------
+---+---------+--------+-----+
| id|firstName|lastName|email|
+---+---------+--------+-----+
+---+---------+--------+-----+

20/04/09 15:29:25 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/0 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.0.8b02991c-8667-4a6a-8918-de514837356f.tmp
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.0.8b02991c-8667-4a6a-8918-de514837356f.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/0
20/04/09 15:29:25 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/1 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.1.c72b5bb7-ceb4-48e4-b157-559ccfb42c8f.tmp
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.1.c72b5bb7-ceb4-48e4-b157-559ccfb42c8f.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/1
-------------------------------------------
Batch: 1
-------------------------------------------
+---+---------+--------+-----+
| id|firstName|lastName|email|
+---+---------+--------+-----+
+---+---------+--------+-----+

20/04/09 15:29:25 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/1 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.1.ffbd7b09-b8f7-4cd2-9e81-f68f80c9c32e.tmp
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.1.ffbd7b09-b8f7-4cd2-9e81-f68f80c9c32e.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/1
20/04/09 15:29:25 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
20/04/09 15:29:27 INFO ContinuousExecution: New epoch 3 is starting.
20/04/09 15:29:27 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/2 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.2.6a4b0e87-b92e-4ce7-8b66-38353b4713e6.tmp
20/04/09 15:29:27 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.2.6a4b0e87-b92e-4ce7-8b66-38353b4713e6.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/2

Я не понимаю, почему они пустые. Может быть, я использую не правильный интервал партии или все остальные? Пожалуйста, помогите, я не знаю, что не так

Это мой java код программы Spark

public void streaming(ArrayList<String> dataToCompare, Integer totalPrice) throws IOException, ClassNotFoundException, TimeoutException, StreamingQueryException {

        StructType structType = new StructType()
                .add("id", DataTypes.IntegerType)
                .add("firstName", DataTypes.StringType)
                .add("lastName", DataTypes.StringType)
                .add("email", DataTypes.StringType);


        SparkConf sparkConf = new SparkConf().setMaster("spark://192.168.56.1:7077").setAppName("SparkClusterApp");
        StreamingContext streamingContext = new StreamingContext(sparkConf, Durations.seconds(5));
        SparkSession sparkSession = SparkSession.builder()
                .getOrCreate();
        ServerSocket serverSocket = new ServerSocket(7777);
        System.out.println("Await connection with client");
        Socket socket = serverSocket.accept();

        Dataset<Row> stream = sparkSession
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", "7777")
                .option("includeTimestamp", true)
                .load()
                .select(functions.from_json(functions.col("value").cast("string"), structType).alias("data"))
                .select("data.*");

        while (stream.isStreaming()) {

            stream.writeStream()
                    .format("console")
                    .trigger(Trigger.Continuous("3 second"))
                    .start().awaitTermination();

        }

    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...