Я пишу пользовательский SerDe и буду использовать его только для десериализации.Базовые данные - это двоичный файл, каждая строка - журнал событий.Каждое событие имеет схему, к которой у меня есть доступ, но мы оборачиваем событие в другую схему, давайте назовем ее Message
перед сохранением.Причина, по которой я пишу SerDe вместо использования ThriftDeserializer , заключается в том, что, как уже упоминалось, основное событие обернуто как Сообщение.Поэтому сначала нам нужно десериализовать, используя схему Message
, а затем десериализовать данные для этого события.
SerDe работает (только), когда я делаю SELECT *
, и я могу десериализовать данные, как ожидается, новсякий раз, когда я выбираю столбец из таблицы вместо SELECT *, все строки имеют значение NULL.Возвращенный инспектор объектов - ThriftStructObjectInspector
, а возвращаемый десериализацией объект - TBase.
Что может заставить Hive возвращать NULL, когда мы выбираем столбец, но возвращать данные столбца, когда я это делаюSELECT *?
Вот класс SerDe (изменил некоторые имена классов):
public class MyThriftSerde extends AbstractSerDe {
private static final Log LOG = LogFactory.getLog(MyThriftSerde.class);
/* Abstracting away the deserialization of the underlying event which is wrapped in a message */
private static final MessageDeserializer myMessageDeserializer =
MessageDeserializer.getInstance();
/* Underlying event class which is wrapped in a Message */
private String schemaClassName;
private Class<?> schemaClass;
/* Used to read the input row */
public static List<String> inputFieldNames;
public static List<ObjectInspector> inputFieldOIs;
public static List<Integer> notSkipIDs;
public static ObjectInspector inputRowObjectInspector;
/* Output Object Inspector */
public static ObjectInspector thriftStructObjectInspector;
@Override
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
try {
logHeading("INITIALIZE MyThriftSerde");
schemaClassName = tbl.getProperty(SERIALIZATION_CLASS);
schemaClass = conf.getClassByName(schemaClassName);
LOG.info(String.format("Building DDL for event: %s", schemaClass.getName()));
inputFieldNames = new ArrayList<>();
inputFieldOIs = new ArrayList<>();
notSkipIDs = new ArrayList<>();
/* Initialize the Input fields */
// The underlying data is stored in RCFile format, and only has 1 column, event_binary
// So we create a ColumnarStructBase for each row we deserialize.
// This ColumnasStruct only has 1 column: event_binary
inputFieldNames.add("event_binary");
notSkipIDs.add(0);
inputFieldOIs.add(LazyPrimitiveObjectInspectorFactory.LAZY_BINARY_OBJECT_INSPECTOR);
inputRowObjectInspector =
ObjectInspectorFactory.getColumnarStructObjectInspector(inputFieldNames, inputFieldOIs);
/* Output Object Inspector*/
// This is what the SerDe will return, it is a ThriftStructObjectInspector
thriftStructObjectInspector =
ObjectInspectorFactory.getReflectionObjectInspector(
schemaClass, ObjectInspectorFactory.ObjectInspectorOptions.THRIFT);
// Only for debugging
logHeading("THRIFT OBJECT INSPECTOR");
LOG.info("Output OI Class Name: " + thriftStructObjectInspector.getClass().getName());
LOG.info(
"OI Details: "
+ ObjectInspectorUtils.getObjectInspectorName(thriftStructObjectInspector));
} catch (Exception e) {
LOG.info("Exception while initializing SerDe", e);
}
}
@Override
public Object deserialize(Writable rowWritable) throws SerDeException {
logHeading("START DESERIALIZATION");
ColumnarStructBase inputLazyStruct =
new ColumnarStruct(inputRowObjectInspector, notSkipIDs, null);
LazyBinary eventBinary;
Message rowAsMessage;
TBase deserializedRow = null;
try {
inputLazyStruct.init((BytesRefArrayWritable) rowWritable);
eventBinary = (LazyBinary) inputLazyStruct.getField(0);
rowAsMessage =
myMessageDeserializer.fromBytes(eventBinary.getWritableObject().copyBytes(), null);
deserializedRow = rowAsMessage.getEvent();
LOG.info("deserializedRow.getClass(): " + deserializedRow.getClass());
LOG.info("deserializedRow.toString(): " + deserializedRow.toString());
} catch (Exception e) {
e.printStackTrace();
}
logHeading("END DESERIALIZATION");
return deserializedRow;
}
private void logHeading(String s) {
LOG.info(String.format("------------------- %s -------------------", s));
}
@Override
public ObjectInspector getObjectInspector() {
return thriftStructObjectInspector;
}
}
Контекст в коде:
- В базовых данныхкаждая строка содержит только 1 столбец (называемый event_binary), который хранится в двоичном виде.Двоичный файл - это Сообщение, которое содержит 2 поля: «схема» + «данные события».то есть каждая строка - это Сообщение, которое содержит схему + данные основного события.Мы используем схему из сообщения для десериализации данных.
- SerDe сначала десериализует строку как сообщение, извлекает данные события, а затем десериализует событие.
Я создаю ВНЕШНЮЮтаблица, которая указывает на данные Thrift, используя
ADD JAR hdfs://my-jar.jar;
CREATE EXTERNAL TABLE dev_db.thrift_event_data_deserialized
ROW FORMAT SERDE 'com.test.only.MyThriftSerde'
WITH SERDEPROPERTIES (
"serialization.class"="com.test.only.TestEvent"
) STORED AS RCFILE
LOCATION 'location/of/thrift/data';
MSCK REPAIR TABLE thrift_event_data_deserialized;
Тогда SELECT * FROM dev_db.thrift_event_data_deserialized LIMIT 10;
работает, как ожидалось Но, SELECT column1_name, column2_name FROM dev_db.thrift_event_data_deserialized LIMIT 10;
не работает.
Есть идеи, что мне здесь не хватает?Буду рад любой помощи в этом!