У меня есть сценарий, в котором мой первый болт в топологии - десериализация сообщения из темы Kafka на основе схемы Avro, десериализация работает нормально для сообщений, которые следуют правильной схеме, и выдает исключение AvroRuntimeException, когда сообщение представляет собой просто что-то еще, чтоне может быть десериализовано, хотя я перехватываю исключение и подтверждаю кортеж, я получаю сообщение об ошибке работника, это связано с конфигурацией шторма?
@Override
public void execute(Tuple input) {
byte[] bytes = null;
try {
bytes = getBinaryFieldFromTuple(input);
if (ArrayUtils.isEmpty(bytes)) {
log.error("Received empty byte[] - skipping.");
return;
}
Message<E> message = getMessageDecoder().decode(bytes);
E payload = message.getPayload();
emit(input, getValuesToEmit(payload, message.getHeaders()));
} catch (Exception ex) {
log.error("Error while deserializing: {}", bytes, ex);
} finally {
ack(input);
}
}
Я не вижу конкретной причины длячто.