Пустой выходной набор структурированных потоковых данных в файл или консоль - PullRequest
0 голосов
/ 20 апреля 2020

Я пытаюсь записать свой поток чтения данных json в каталог, но у меня есть эта проблема с logOffset. И я не понимаю эту ошибку. Я отправляю свои json данные из java приложения вот так

Spark версия - 3.0.0 preview2 И это мой pom. xml со свечным ядром, sql и потоковыми версиями

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>spark</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <scala.version>2.12</scala.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1.1</version>
        </dependency>

        <!-- Spark -->
        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>2.4.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.5</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>2.4.5</version>
            <scope>provided</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

public static void main(String[] args) throws IOException, InterruptedException {
        JsonGenerator jsonGenerator = new JsonGenerator();
        Jsons jsons = new Jsons();
        Socket socket = new Socket("localhost", 7777);
        System.out.println("connected!");
        DataOutputStream dataOutputStream = new DataOutputStream(socket.getOutputStream());
        OutputStream outputStream = socket.getOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        int i=0;
        for (;;){
            dataOutputStream.writeUTF(jsons.generate().toJSONString());
            i++;
            System.out.println(jsons.generate());
            System.out.println(jsons.generate().toJSONString());
            System.out.println(i);
        }
    }

Класс, который генерирует json:

public class Jsons {
    public JSONObject generate(){
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("id", new Random().nextInt(1000));
        jsonObject.put("firstName", "FIRST");
        jsonObject.put("lastName", "LAST");
        jsonObject.put("email", "dadadada");
        jsonObject.put("timestamp", new Timestamp(System.currentTimeMillis()));

        return jsonObject;
    }
}

И я прочитал эти данные, отправленные в сокет, используя схему Spark Structured Streaming и StructType моей json publi c класса, реализующей SparkJsonAnalyzer Serializable {

    public void streaming() throws IOException, StreamingQueryException {

        StructType structType = new StructType()
                .add("firstName", DataTypes.StringType, false)
                .add("lastName", DataTypes.StringType, false)
                .add("id", DataTypes.IntegerType, false)
                .add("email", DataTypes.StringType, false)
                .add("timestamp", DataTypes.TimestampType, false);

        SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("SparkClusterApp");
        SparkSession sparkSession = SparkSession
                .builder().config(sparkConf)
                .getOrCreate();

        sparkSession.sql("SET spark.sql.streaming.metricsEnabled=true");

        ServerSocket serverSocket = new ServerSocket(7777);
        System.out.println("Await connection with client");
        Socket socket = serverSocket.accept();

        Dataset<Row> stream = sparkSession
                .readStream()
                .format("socket")
                .option("host", "localhost")
                .option("port", socket.getPort())
                .option("startingOffsets", "earliest")
                .load()
                .selectExpr("CAST(value AS STRING) as jsonString")
                .select(functions.from_json(functions.col("jsonString"), structType).as("json"))
                .select("json.*");


            StreamingQuery query = stream.writeStream()
                    .queryName("data")
                    .format("json")
                    .option("checkpointLocation", "D:\\checkpoints")
                    .option("path", "D:\\json")
                    .start();

            query.awaitTermination();
    }
}

И когда я запускаю свое приложение spark, подключаюсь к сокету и начинаю получать данные для spark, я вижу этот вывод в консоли, в первый раз у меня есть некоторая информация о ходе потокового запроса. но я этого не понимаю Может кто-нибудь сказал мне, что это значит?

20/04/20 14:45:26 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "aaf1da3c-298b-4d90-ae1d-abdc1f7c0cea",
  "runId" : "2ee97afd-362b-4cad-8d47-7b9d0a343142",
  "name" : "data",
  "timestamp" : "2020-04-20T11:45:24.630Z",
  "batchId" : 0,
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "addBatch" : 1441,
    "getBatch" : 2,
    "getEndOffset" : 0,
    "queryPlanning" : 95,
    "setOffsetRange" : 0,
    "triggerExecution" : 2100,
    "walCommit" : 303
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketV2[host: localhost, port: 7777]",
    "startOffset" : null,
    "endOffset" : -1,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[D:\\json]"
  }
}

И каждый раз, когда при потоковом запросе выполнялся параметр numInputRows, 0, и у меня нет строк в выходном файле, и программа-искра не останавливается, когда я отменить получение данных. И после того, как в первый раз потоковый запрос достиг прогресса, у меня в журналах только 0 параметров, например:

20/04/20 14:45:26 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "aaf1da3c-298b-4d90-ae1d-abdc1f7c0cea",
  "runId" : "2ee97afd-362b-4cad-8d47-7b9d0a343142",
  "name" : "data",
  "timestamp" : "2020-04-20T11:45:26.748Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getEndOffset" : 0,
    "setOffsetRange" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketV2[host: localhost, port: 7777]",
    "startOffset" : -1,
    "endOffset" : -1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[D:\\json]"
  }
}
20/04/20 14:45:36 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "aaf1da3c-298b-4d90-ae1d-abdc1f7c0cea",
  "runId" : "2ee97afd-362b-4cad-8d47-7b9d0a343142",
  "name" : "data",
  "timestamp" : "2020-04-20T11:45:36.757Z",
  "batchId" : 1,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getEndOffset" : 0,
    "setOffsetRange" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketV2[host: localhost, port: 7777]",
    "startOffset" : -1,
    "endOffset" : -1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "FileSink[D:\\json]"
  }
}

Пожалуйста, помогите мне понять, что означает этот вывод и почему у меня нет строк в выводе файл

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...