Java-код для чтения сообщений Kafka avro в потоковой структуре Spark 2.1.1 - PullRequest
0 голосов
/ 24 января 2019

Я ищу, как читать авро-сообщения, которые имеют сложную структуру, из Kafka, используя потоковую структуру Spark

Затем я хочу проанализировать эти сообщения и сравнить их со ссылочными значениями hbase, а затем сохранить результат в hdfs илидругой стол.

Я начал с примера кода ниже: https://github.com/Neuw84/spark-continuous-streaming/blob/master/src/main/java/es/aconde/structured/StructuredDemo.java

Схема сообщений Avro:

struct[mTimeSeries:
  struct[cName:string,
         eIpAddr:string,
         pIpAddr:string,
         pTime:string,
         mtrcs:array[struct[mName:string,
                            xValues:array[bigint],
                            yValues:array[string],
                            rName:string]]]]

Я пытаюсь создать строку, используя RowFactory.create для этой схемы.Так что мне нужно перебирать поля массива?Я понимаю, что мы можем использовать функции разнесения в наборе данных, чтобы денормализовать или получить доступ к внутренним полям массива структуры, как только мы создадим набор данных с этой структурой, как я делаю это в Hive.Поэтому я хотел бы создать строку как есть, то есть как выглядит сообщение avro, а затем использовать функции sql для дальнейшего преобразования.

    sparkSession.udf().register("deserialize", (byte[] data) -> {
        GenericRecord record = recordInjection.invert(data).get();
        return ***RowFactory.create(record.get("machine").toString(), record.get("sensor").toString(), record.get("data"), record.get("eventTime"));***
    }, DataTypes.createStructType(type.fields())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...