Я пытаюсь обработать потоковые avro-данные из kafka, используя структурированную потоковую обработку в формате spark (версия-2.3.1), поэтому я попытался на этом примере десериализовать.
Это работает, только если часть value
содержит темы StringType
, но в моем случае схема содержит long and integers
, как показано ниже:
public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"variables\","
+ "\"fields\":["
+ " { \"name\":\"time\", \"type\":\"long\" },"
+ " { \"name\":\"thnigId\", \"type\":\"string\" },"
+ " { \"name\":\"controller\", \"type\":\"int\" },"
+ " { \"name\":\"module\", \"type\":\"int\" }"
+ "]}";
Так что это дает исключение на
sparkSession.udf().register("deserialize", (byte[] data) -> {
GenericRecord record = recordInjection.invert(data).get(); //throws error at invert method.
return RowFactory.create(record.get("time"), record.get("thingId").toString(), record.get("controller"), record.get("module"));
}, DataTypes.createStructType(type.fields()));
говоря
Failed to invert: [B@22a45e7
Caused by java.io.IOException: Invalid int encoding.
потому что у меня time, controller and module
в схеме long and int
типов.
Полагаю, это какие-то ошибки форматирования кодирования и декодирования массива байтов byte[] data
.