Структурированная потоковая передача сокращает временные метки Кафки до секунд - PullRequest
0 голосов
/ 05 октября 2018

Я читаю данные из Kafka с помощью Spark Structured Streaming и хочу включить метку времени Kafka в сообщение:

sparkSession.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker:10000")
  .option("subscribe", "topicname")
  .option("includeTimestamp", true)
  .load()
  .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .as[(String, String, String, Long)]

Когда я смотрю на метку времени, она усекается от миллисекунд до секунд.Можно ли как-нибудь вернуть точность в миллисекундах после чтения?

Ответы [ 2 ]

0 голосов
/ 08 октября 2018

Усечение происходит, когда метки времени считываются как значение Long.Это происходит в последней строке:

sparkSession.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka-broker:10000")
  .option("subscribe", "topicname")
  .option("includeTimestamp", true)
  .load()
  .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
  .as[(String, String, String, Long)]

Он не усекается при изменении последней строки на:

.as[(String, String, String, Timestamp)]
0 голосов
/ 07 октября 2018

Я только что быстро попробовал это в IntelliJ с моей локальной установкой Kafka.

Если вы ссылаетесь на три точки в конце поля метки времени как усечение (как в выходных данных ниже):

Batch: 1
-------------------------------------------
+-----+----+--------+--------------------+
|topic| key|   value|           timestamp|
+-----+----+--------+--------------------+
| test|null|test-123|2018-10-07 03:10:...|
| test|null|test-234|2018-10-07 03:10:...|
+-----+----+--------+--------------------+

Затем вам просто нужно добавить следующую строку:

.option("truncate", false)

в вашей writeStream() части, например:

Dataset<Row> df = sparkSession
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "test")
                .option("includeTimestamp", "true")
                .load()
                .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp as STRING)");

try {
    df.writeStream()
          .outputMode("append")
          .format("console")
          .option("truncate", false)
          .start()
          .awaitTermination();
} catch (StreamingQueryException e) {
    e.printStackTrace();
}

Это изменение дало мне полную метку времени в выходных данных:

Batch: 1
-------------------------------------------
+-----+----+--------+-----------------------+
|topic|key |value   |timestamp              |
+-----+----+--------+-----------------------+
|test |null|test-123|2018-10-07 03:19:50.677|
|test |null|test-234|2018-10-07 03:19:52.673|
+-----+----+--------+-----------------------+

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...