Я читаю данные из сокета и показываю их на консоль, но в моем выводе нет строк. Мой код:
public void streaming(ArrayList<String> dataToCompare, Integer totalPrice) throws IOException, ClassNotFoundException, TimeoutException, StreamingQueryException {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
StructType structType = new StructType()
.add("id", DataTypes.IntegerType)
.add("firstName", DataTypes.StringType)
.add("lastName", DataTypes.StringType)
.add("email", DataTypes.StringType)
.add("timestamp", DataTypes.TimestampType);
SparkSession sparkSession = SparkSession.builder()
.master("local[4]")
.appName("CLUSTER")
.getOrCreate();
sparkSession.sql("SET spark.sql.streaming.metricsEnabled=true");
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("Await connection with client");
Socket socket = serverSocket.accept();
ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
while (objectInputStream.readObject() != null) {
if (objectInputStream.readObject() != null) {
System.out.println(objectInputStream.readObject().toString());
Dataset<Row> stream = sparkSession
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", "7777")
.option("includeTimestamp", true)
.load()
.select(functions.from_json(functions.col("value").cast("string"), structType).as("data"))
.select("data.*");
if(stream.isStreaming()){
stream.select("firstName").writeStream().format("console").start().awaitTermination();
}
} else {
System.out.println("not connect");
}
}
}
В моем выводе у меня есть только структура моего фрейма данных: --------------------------- ----------------
Batch: 0
-------------------------------------------
+---+---------+--------+-----+---------+
| id|firstName|lastName|email|timestamp|
+---+---------+--------+-----+---------+
+---+---------+--------+-----+---------+
Это все мой вывод, и я не понимаю, почему. Если я добавляю непрерывный триггер для writeStream, у меня есть больше пакетов, но все пусто. Может быть, моя проблема в том, что спарк не показывает выход с меньшим объемом, чем какой-либо стандарт? Пожалуйста, помогите, я прочитал всю документацию искры, но ничего не нашел