Я пытаюсь записать свой поток чтения данных json в каталог, но у меня есть эта проблема с logOffset. И я не понимаю эту ошибку. Я отправляю свои json данные из java приложения вот так
Spark версия - 3.0.0 preview2 И это мой pom. xml со свечным ядром, sql и потоковыми версиями
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<scala.version>2.12</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
<!-- Spark -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
public static void main(String[] args) throws IOException, InterruptedException {
JsonGenerator jsonGenerator = new JsonGenerator();
Jsons jsons = new Jsons();
Socket socket = new Socket("localhost", 7777);
System.out.println("connected!");
DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
int i=0;
for (;;){
dataOutputStream.writeUTF(jsons.generate().toJSONString());
i++;
System.out.println(jsons.generate());
System.out.println(jsons.generate().toJSONString());
System.out.println(i);
}
}
Класс, который генерирует json:
public class Jsons {
public JSONObject generate(){
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", new Random().nextInt(1000));
jsonObject.put("firstName", "FIRST");
jsonObject.put("lastName", "LAST");
jsonObject.put("email", "dadadada");
jsonObject.put("timestamp", new Timestamp(System.currentTimeMillis()));
return jsonObject;
}
}
И я прочитал эти данные, отправленные в сокет, используя схему Spark Structured Streaming и StructType моей json publi c класса, реализующей SparkJsonAnalyzer Serializable {
public void streaming() throws IOException, StreamingQueryException {
StructType structType = new StructType()
.add("firstName", DataTypes.StringType, false)
.add("lastName", DataTypes.StringType, false)
.add("id", DataTypes.IntegerType, false)
.add("email", DataTypes.StringType, false)
.add("timestamp", DataTypes.TimestampType, false);
SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkClusterApp");
SparkSession sparkSession = SparkSession
.builder().config(sparkConf)
.getOrCreate();
sparkSession.sql("SET spark.sql.streaming.metricsEnabled=true");
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("Await connection with client");
Socket socket = serverSocket.accept();
Dataset<Row> stream = sparkSession
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", socket.getPort())
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING) as jsonString")
.select(functions.from_json(functions.col("jsonString"), structType).as("json"))
.select("json.*");
StreamingQuery query = stream.writeStream()
.queryName("data")
.format("json")
.option("checkpointLocation", "D:\\checkpoints")
.option("path", "D:\\json")
.start();
query.awaitTermination();
}
}
И когда я запускаю свое приложение spark, подключаюсь к сокету и начинаю получать данные для spark, я вижу этот вывод в консоли, в первый раз у меня есть некоторая информация о ходе потокового запроса. но я этого не понимаю Может кто-нибудь сказал мне, что это значит?
20/04/20 14:45:26 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "aaf1da3c-298b-4d90-ae1d-abdc1f7c0cea",
"runId" : "2ee97afd-362b-4cad-8d47-7b9d0a343142",
"name" : "data",
"timestamp" : "2020-04-20T11:45:24.630Z",
"batchId" : 0,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 1441,
"getBatch" : 2,
"getEndOffset" : 0,
"queryPlanning" : 95,
"setOffsetRange" : 0,
"triggerExecution" : 2100,
"walCommit" : 303
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "TextSocketV2[host: localhost, port: 7777]",
"startOffset" : null,
"endOffset" : -1,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[D:\\json]"
}
}
И каждый раз, когда при потоковом запросе выполнялся параметр numInputRows, 0, и у меня нет строк в выходном файле, и программа-искра не останавливается, когда я отменить получение данных. И после того, как в первый раз потоковый запрос достиг прогресса, у меня в журналах только 0 параметров, например:
20/04/20 14:45:26 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "aaf1da3c-298b-4d90-ae1d-abdc1f7c0cea",
"runId" : "2ee97afd-362b-4cad-8d47-7b9d0a343142",
"name" : "data",
"timestamp" : "2020-04-20T11:45:26.748Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getEndOffset" : 0,
"setOffsetRange" : 0,
"triggerExecution" : 1
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "TextSocketV2[host: localhost, port: 7777]",
"startOffset" : -1,
"endOffset" : -1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[D:\\json]"
}
}
20/04/20 14:45:36 INFO MicroBatchExecution: Streaming query made progress: {
"id" : "aaf1da3c-298b-4d90-ae1d-abdc1f7c0cea",
"runId" : "2ee97afd-362b-4cad-8d47-7b9d0a343142",
"name" : "data",
"timestamp" : "2020-04-20T11:45:36.757Z",
"batchId" : 1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0,
"durationMs" : {
"getEndOffset" : 0,
"setOffsetRange" : 0,
"triggerExecution" : 0
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "TextSocketV2[host: localhost, port: 7777]",
"startOffset" : -1,
"endOffset" : -1,
"numInputRows" : 0,
"inputRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "FileSink[D:\\json]"
}
}
Пожалуйста, помогите мне понять, что означает этот вывод и почему у меня нет строк в выводе файл