Запись json без схемы в HDFS с использованием искрового потока - PullRequest
0 голосов
/ 18 июня 2020

Мой вариант использования - записать Kafka в приемник HDFS с использованием потоковой передачи Spark в Java. (Java по историческим причинам и без использования kafka-connect из-за отсутствия поддержки HDFS 3).
Код выглядит как

public static void main(String[] args) throws StreamingQueryException {
        SparkSession spark = SparkSession.builder()
                .appName("testApp")
                .master("local")
                .getOrCreate();

        StreamingQuery query = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:39092")
                .option("subscribe", "sample.prices")
                .load()
                .selectExpr("CAST(value AS STRING)")
                .writeStream()
                .outputMode("append")
                .format("json")
                .option("path", "/tmp/kafka2hdfsSink")
                .option("checkpointLocation", "/tmp/kafka2hdfsCheckPoint")
                .start();

        query.awaitTermination();
    }

Проблема в том, что файлы json имеют расширение " value "экранированная строка. Например,

{"value" : "{\"_headers\": {\"schemaId\": 

Я предполагаю, что "CAST (value AS STRING)" вызывает это, если только нет проблем с совместимостью версий (я использую Java8, spark-sql_2.11 - 2.2.0, Had oop -3.2.1, Spark 2.4.5 - Локальная попытка на Ma c)

Если я не использую приведение, значение будет закодировано в base64

{"value":"eyJfaGVhZGVycy...=="}

Я не смог использовать from_ json, потому что ему нужна схема. Есть ли способ сделать это без указания схемы для json? Мне удалось извлечь значения с помощью запроса jsonpath, но он по-прежнему обрабатывает его так же, как указано выше.

get_json_object(col("_value"), "$")

Я бы не хотел использовать формат («текст»), потому что мне нужен. json расширение и, вероятно, захотите больше столбцов, таких как временная метка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...