Мой вариант использования - записать Kafka в приемник HDFS с использованием потоковой передачи Spark в Java. (Java по историческим причинам и без использования kafka-connect из-за отсутствия поддержки HDFS 3).
Код выглядит как
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession.builder()
.appName("testApp")
.master("local")
.getOrCreate();
StreamingQuery query = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:39092")
.option("subscribe", "sample.prices")
.load()
.selectExpr("CAST(value AS STRING)")
.writeStream()
.outputMode("append")
.format("json")
.option("path", "/tmp/kafka2hdfsSink")
.option("checkpointLocation", "/tmp/kafka2hdfsCheckPoint")
.start();
query.awaitTermination();
}
Проблема в том, что файлы json имеют расширение " value "экранированная строка. Например,
{"value" : "{\"_headers\": {\"schemaId\":
Я предполагаю, что "CAST (value AS STRING)" вызывает это, если только нет проблем с совместимостью версий (я использую Java8, spark-sql_2.11 - 2.2.0, Had oop -3.2.1, Spark 2.4.5 - Локальная попытка на Ma c)
Если я не использую приведение, значение будет закодировано в base64
{"value":"eyJfaGVhZGVycy...=="}
Я не смог использовать from_ json, потому что ему нужна схема. Есть ли способ сделать это без указания схемы для json? Мне удалось извлечь значения с помощью запроса jsonpath, но он по-прежнему обрабатывает его так же, как указано выше.
get_json_object(col("_value"), "$")
Я бы не хотел использовать формат («текст»), потому что мне нужен. json расширение и, вероятно, захотите больше столбцов, таких как временная метка.