Что у нас есть
stream.foreachRDD(rdd->{
JavaRDD<String> javaRDD=rdd.map(elem -> elem.value());
Dataset ds = //Any transformations
sc.read().schema(csvSchema).csv(ds).write();
});
Какие плохие идеи:
1) Плохо, потому что используется .collect ()
sc.sqlContext().createDataset(javaRDD.collect(), Encoders.STRING())
2) Плохо, потому что StringType нельзя привести кStructType '
sc.sqlContext().createDataFrame(javaRDD,String.class)
3) Без схемы:
Плохо из-за низкой гибкости, читаемости кода (более 30 полей, массивный конструктор), неизменного порядка столбцов - полей в алфавитном порядке (запись в паркет для импалы)
JavaRDD<String> javaRDD=rdd.map(elem -> new Model(elem.value()));
sc.sqlContext().createDataFrame(javaRDD,Model.class);