Я только что быстро попробовал это в IntelliJ с моей локальной установкой Kafka.
Если вы ссылаетесь на три точки в конце поля метки времени как усечение (как в выходных данных ниже):
Batch: 1
-------------------------------------------
+-----+----+--------+--------------------+
|topic| key| value| timestamp|
+-----+----+--------+--------------------+
| test|null|test-123|2018-10-07 03:10:...|
| test|null|test-234|2018-10-07 03:10:...|
+-----+----+--------+--------------------+
Затем вам просто нужно добавить следующую строку:
.option("truncate", false)
в вашей writeStream()
части, например:
Dataset<Row> df = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("includeTimestamp", "true")
.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as STRING)");
try {
df.writeStream()
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
Это изменение дало мне полную метку времени в выходных данных:
Batch: 1
-------------------------------------------
+-----+----+--------+-----------------------+
|topic|key |value |timestamp |
+-----+----+--------+-----------------------+
|test |null|test-123|2018-10-07 03:19:50.677|
|test |null|test-234|2018-10-07 03:19:52.673|
+-----+----+--------+-----------------------+
Надеюсь, это поможет.