Я пишу json отправитель в сокет и другую искровую программу. Я посылаю в свой сокет json вот так
{"id":66,"firstName":"rifedander@hotmail.co.uk","lastName":
"sithprays@live.com","email":"crankfrock@hotmail.co.uk"}
В Spark Stream у меня есть строка json, я разделяю ее, как моя схема StructType, и когда я пытаюсь показать вывод в консоли, у меня есть только пустой результат, но я знаю, что данные отправляются правильно, а не ноль. Мой вывод
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.0.b177b949-0149-45e9-91c3-7a1c08f41e50.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/0
-------------------------------------------
Batch: 0
-------------------------------------------
+---+---------+--------+-----+
| id|firstName|lastName|email|
+---+---------+--------+-----+
+---+---------+--------+-----+
20/04/09 15:29:25 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/0 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.0.8b02991c-8667-4a6a-8918-de514837356f.tmp
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.0.8b02991c-8667-4a6a-8918-de514837356f.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/0
20/04/09 15:29:25 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/1 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.1.c72b5bb7-ceb4-48e4-b157-559ccfb42c8f.tmp
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.1.c72b5bb7-ceb4-48e4-b157-559ccfb42c8f.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/1
-------------------------------------------
Batch: 1
-------------------------------------------
+---+---------+--------+-----+
| id|firstName|lastName|email|
+---+---------+--------+-----+
+---+---------+--------+-----+
20/04/09 15:29:25 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/1 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.1.ffbd7b09-b8f7-4cd2-9e81-f68f80c9c32e.tmp
20/04/09 15:29:25 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/.1.ffbd7b09-b8f7-4cd2-9e81-f68f80c9c32e.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/commits/1
20/04/09 15:29:25 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
20/04/09 15:29:27 INFO ContinuousExecution: New epoch 3 is starting.
20/04/09 15:29:27 INFO CheckpointFileManager: Writing atomically to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/2 using temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.2.6a4b0e87-b92e-4ce7-8b66-38353b4713e6.tmp
20/04/09 15:29:27 INFO CheckpointFileManager: Renamed temp file file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/.2.6a4b0e87-b92e-4ce7-8b66-38353b4713e6.tmp to file:/C:/Users/Zubrynovich_M/AppData/Local/temporary-9d17b025-d19e-421a-bdd2-d34c0ce2ccec/offsets/2
Я не понимаю, почему они пустые. Может быть, я использую не правильный интервал партии или все остальные? Пожалуйста, помогите, я не знаю, что не так
Это мой java код программы Spark
public void streaming(ArrayList<String> dataToCompare, Integer totalPrice) throws IOException, ClassNotFoundException, TimeoutException, StreamingQueryException {
StructType structType = new StructType()
.add("id", DataTypes.IntegerType)
.add("firstName", DataTypes.StringType)
.add("lastName", DataTypes.StringType)
.add("email", DataTypes.StringType);
SparkConf sparkConf = new SparkConf().setMaster("spark://192.168.56.1:7077").setAppName("SparkClusterApp");
StreamingContext streamingContext = new StreamingContext(sparkConf, Durations.seconds(5));
SparkSession sparkSession = SparkSession.builder()
.getOrCreate();
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("Await connection with client");
Socket socket = serverSocket.accept();
Dataset<Row> stream = sparkSession
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", "7777")
.option("includeTimestamp", true)
.load()
.select(functions.from_json(functions.col("value").cast("string"), structType).alias("data"))
.select("data.*");
while (stream.isStreaming()) {
stream.writeStream()
.format("console")
.trigger(Trigger.Continuous("3 second"))
.start().awaitTermination();
}
}
}