Вы можете использовать wholeTextFiles
, чтобы прочитать файл json, получить текст json и использовать его в json
API SparkSession
как
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
static SparkSession spark = SparkSession.builder().master("local").appName("simple").getOrCreate();
static JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
Dataset<Row> df = spark.read().json(sc.wholeTextFiles("path to json file").map(t -> t._2()));
df.show(false);
, и вы должны получить
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|@tenant_id|alarmUpdateTime|alarm_id|alarm_updates |aucID|inID |index|product|source|state|
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
|XYZ |1526342400000 |AB5C9123|[[Severity,Minor,Major], [state,UPDATE,NEW]]|5af83|INC15234567|test |test |ABS |NEW |
+----------+---------------+--------+--------------------------------------------+-----+-----------+-----+-------+------+-----+
Вы можете использовать master
и appName
по своему усмотрению
Обновлено
Вы прокомментировали, что
The way you do through file , can we do it with the object . I have to convert to Ingest the data to the other T2
Для этого предположим, что у вас есть запись, прочитанная из темы T1 как строковый объект как
String t1Record = "[\n" +
" {\n" +
" \"@tenant_id\":\"XYZ\",\n" +
" \"alarmUpdateTime\":1526342400000,\n" +
" \"alarm_id\":\"AB5C9123\",\n" +
" \"alarm_updates\":[\n" +
" {\n" +
" \"alarmField\":\"Severity\",\n" +
" \"new_value\":\"Minor\",\n" +
" \"old_value\":\"Major\"\n" +
" },\n" +
" {\n" +
" \"alarmField\":\"state\",\n" +
" \"new_value\":\"UPDATE\",\n" +
" \"old_value\":\"NEW\"\n" +
" }\n" +
" ],\n" +
" \"aucID\":\"5af83\",\n" +
" \"inID\":\"INC15234567\",\n" +
" \"index\":\"test\",\n" +
" \"product\":\"test\",\n" +
" \"source\":\"ABS\",\n" +
" \"state\":\"NEW\"\n" +
" }\n" +
"]";
ивы конвертируете его в RDD
как
JavaRDD<String> t1RecordRDD = sc.parallelize(Arrays.asList(t1Record));
Затем вы можете применить json
API для преобразования в dataframe
как
Dataset<Row> df = spark.read().json(t1RecordRDD);
, что должно дать вам тот же результат, что ивыше