Конвертировать XML в набор данных <Row> - PullRequest
0 голосов
/ 16 февраля 2020

Я получаю XML от kafka и использую Spark kafka API, используя приведенный ниже код

public class XMLSparkStreamEntry {

    public static void registerPrintValue(SparkSession spark) {

        spark.udf().register("registerPrintValue", new UDF1<String, List<Row>>() {

            private static final long serialVersionUID = 1L;

            List<Row> rows = new ArrayList<Row>();

            @Override
            public List<Row> call(String t1) throws Exception {

                JAXBContext jaxbContext = JAXBContext.newInstance(FileWrapper.class);
                Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();

                StringReader reader = new StringReader(t1);
                FileWrapper person = (FileWrapper) unmarshaller.unmarshal(reader);

                List<Employee> emp = new ArrayList<Employee>(person.getEmployees());

                for (Employee e : emp) {    
                    rows.add(RowFactory.create(e.getFirstname(), e.getLastname(), e.getTitle(), e.getId(),
                            e.getDivision(), e.getSupervisor(), e.getTitle()));
                } 
                return rows;
            }
        }, DataTypes.StringType);
    }

    public static void main(String[] args) throws StreamingQueryException {

        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession.builder().config(conf).appName("Spark Program").master("local[*]")
                .getOrCreate();

        Dataset<Row> ds1 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "Kafkademo").load();

        Dataset<Row> stringTypeDS = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

        XMLSparkStreamEntry.registerPrintValue(spark);
        Dataset<Row> ss = stringTypeDS.select(callUDF("registerPrintValue", stringTypeDS.col("value")));

Я не понимаю, как поступить дальше. Я создал одну функцию UDF с именем registerPrintValue, где я передаю xml строку. В XML могут присутствовать несколько тегов или экземпляров Employee.

В функции UDF третьим параметром является generic c тип возвращаемого набора данных. Я дал DataTypes.StringType, но я думаю, что это неправильно, и никакая другая опция сейчас недоступна.

Как я могу преобразовать мой XML с несколькими тегами Employee в Dataset<Row>? Я думаю, что я поступаю неправильно.

Обновлен код

public class XMLSparkStreamEntry {

    static StructType structType = new StructType();

    static {
        structType = structType.add("FirstName", DataTypes.StringType, false);
        structType = structType.add("LastName", DataTypes.StringType, false);
        structType = structType.add("Title", DataTypes.StringType, false);
        structType = structType.add("ID", DataTypes.StringType, false);
        structType = structType.add("Division", DataTypes.StringType, false);
        structType = structType.add("Supervisor", DataTypes.StringType, false);

    }

    static ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    public static void main(String[] args) throws StreamingQueryException {

        SparkConf conf = new SparkConf();
        SparkSession spark = SparkSession.builder().config(conf).appName("Spark Program").master("local[*]")
                .getOrCreate();

        Dataset<Row> ds1 = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "Kafkademo").load();

        Dataset<Row> ss = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

        Dataset<Row> finalOP = ss.flatMap(new FlatMapFunction<Row, Row>() {

            private static final long serialVersionUID = 1L;

            @Override
            public Iterator<Row> call(Row t) throws Exception {

                JAXBContext jaxbContext = JAXBContext.newInstance(FileWrapper.class);
                Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();

                StringReader reader = new StringReader(t.getAs("value"));
                FileWrapper person = (FileWrapper) unmarshaller.unmarshal(reader);

                List<Employee> emp = new ArrayList<Employee>(person.getEmployees());
                List<Row> rows = new ArrayList<Row>();
                for (Employee e : emp) {

                    rows.add(RowFactory.create(e.getFirstname(), e.getLastname(), e.getTitle(), e.getId(),
                            e.getDivision(), e.getSupervisor()));

                }
                return rows.iterator();
            }
        }, encoder);


        Dataset<Row> wordCounts = finalOP.groupBy("FirstName").count();

        StreamingQuery query = wordCounts.writeStream().outputMode("complete").format("console").start();
        System.out.println("SHOW SCHEMA");
        query.awaitTermination();

    }

}

Вывод

Вывод получен

+---------+-----+
|FirstName|count|
+---------+-----+
+---------+-----+
...