Как декодировать байт [] списка <Objects>в набор данных <Row>в искре? - PullRequest
0 голосов
/ 07 февраля 2020

Я использую spark- sql -2.3.1v, kafka с java8 в моем проекте. Я пытаюсь преобразовать полученный topi c байт [] в набор данных на стороне потребителя kafka.

Вот подробности

У меня есть

class Company{
    String companyName;
    Integer companyId;
}

Который я определил как

public static final StructType companySchema = new StructType(
              .add("companyName", DataTypes.StringType)
              .add("companyId", DataTypes.IntegerType);

Но сообщение определено как

class Message{
    private List<Company> companyList;
    private String messageId;
}

Я попытался определить как

StructType messageSchema = new StructType()
            .add("companyList", DataTypes.createArrayType(companySchema , false),false)
            .add("messageId", DataTypes.StringType);

Я отправил сообщение kafka topi c как байт [ ] с использованием сериализации.

Я успешно получил байт сообщения [] у потребителя. Который я пытаюсь преобразовать как набор данных ?? как это сделать?

   Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");

  messagesDs.printSchema();

  root
         |-- companyList: array (nullable = true)
         |    |-- element: struct (containsNull = true)
         |    |    |-- companyName: string (nullable = true)
         |    |    |-- companyId: integer (nullable = true)
         |-- messageId: string (nullable = true)    

Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));

comapanyListDs.printSchema();

root
 |-- col: struct (nullable = true)
 |    |-- companyName: string (nullable = true)
 |    |-- companyId: integer (nullable = true)



Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));

Получение ошибки:

Исключение в потоке "главная" организация. apache .spark. sql .AnalysisException: не может разрешить 'companyName' в данных входных столбцах: [col];

Как получить записи набора данных, как его получить?

1 Ответ

1 голос
/ 07 февраля 2020

Ваша структура была названа с помощью "col" при взрыве.

Поскольку ваш класс Бина не имеет атрибута "col", он завершается с указанной ошибкой.

Исключение в потоке "main" org. apache .spark. sql .AnalysisException: не удается разрешить указанные столбцы ввода 'companyName': [col];

Чтобы получить соответствующие столбцы, можно выполнить следующие действия. как простой столбец: Примерно так:

    Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));

Я не тестировал синтаксис, но должен выполнить следующий шаг, как только вы получите простые столбцы из структуры для каждой строки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...