Потоковый процессор Wso2: Произошла ошибка при обработке eventByteBufferQueue - PullRequest
0 голосов
/ 27 февраля 2019

У меня есть два узла аналитического сервера wso2-am (2.6.0), который является процессором Wso2 Stream.Я вижу следующую ошибку на пассивном узле кластера.Активный узел в порядке, и я не вижу ошибок.Результат аналитики не влияет на пользователей, просматривающих данные в API Publisher или Store.однако в пассивном узле произошла ошибка.

сообщите, пожалуйста, что является причиной следующей проблемы.

2019-02-26 17: 06: 09,513] ОШИБКА {org.wso2.carbon.stream.processor. core.ha .tcp.EventSyncServer} - Ошибка при обработке eventByteBufferQueue null java.nio.BufferUnderflowException

1 Ответ

0 голосов
/ 29 марта 2019

Просто встретите ту же проблему, вот моя проблема и решение.1) Использование развертывания WSO2 SP HA.2) Когда событие приходит в активный узел и в соответствии с исходным отображением потоковой передачи некоторые поля имеют значение NULL. 3) Активный узел хотел бы синхронизировать это событие с пассивным узлом. 4) Пассивный узел выбирает данные события из события «EventByteBufferQueue», чтобы удовлетворитьмеханизм перехода в режим ожидания 5) пассивный узел не может проанализировать данные от активного узла и сообщает об ошибке ошибки.

основной причиной является SP по умолчанию только поддержка строки NULL, когда NULL с LONG, INTEGER .. произошла ошибка,но для меня длинные поля имеют значение NULL в нормальном случае, вы можете изменить тип данных на строку.

вот мое решение: org.wso2.carbon.stream.processor.core_2.0.478.jar Добавить логику вподдержка NULL BinaryMessageConverterUtil.java для отправки данных о событиях с активного узла

public final class BinaryMessageConverterUtil {

public static int getSize(Object data) {
    if (data instanceof String) {
        return 4 + ((String) data).length();
    } else if (data instanceof Integer) {
        return 4;
    } else if (data instanceof Long) {
        return 8;
    } else if (data instanceof Float) {
        return 4;
    } else if (data instanceof Double) {
        return 8;
    } else if (data instanceof Boolean) {
        return 1;
    } else if (data == null) {
        return 0;
    }else {
        //TODO
        return 4;
    }
}

public static EventDataMetaInfo getEventMetaInfo(Object data) {
    int eventSize;
    Attribute.Type attributeType;
    if (data instanceof String) {
        attributeType = Attribute.Type.STRING;
        eventSize = 4 + ((String) data).length();
    } else if (data instanceof Integer) {
        attributeType = Attribute.Type.INT;
        eventSize = 4;
    } else if (data instanceof Long) {
        attributeType = Attribute.Type.LONG;
        eventSize = 8;
    } else if (data instanceof Float) {
        attributeType = Attribute.Type.FLOAT;
        eventSize = 4;
    } else if (data instanceof Double) {
        attributeType = Attribute.Type.DOUBLE;
        eventSize = 8;
    } else if (data instanceof Boolean) {
        attributeType = Attribute.Type.BOOL;
        eventSize = 1;
    } else if (data == null){
        attributeType = Attribute.Type.OBJECT;
        eventSize = 0; //'no content between the HA nodes for NULL fields'
    } else {
        //TODO
        attributeType = Attribute.Type.OBJECT;
        eventSize = 1;
    }
    return new EventDataMetaInfo(eventSize, attributeType);
}

public static void assignData(Object data, ByteBuffer eventDataBuffer) throws IOException {
    if (data instanceof String) {
        eventDataBuffer.putInt(((String) data).length());
        eventDataBuffer.put((((String) data).getBytes(Charset.defaultCharset())));
    } else if (data instanceof Integer) {
        eventDataBuffer.putInt((Integer) data);
    } else if (data instanceof Long) {
        eventDataBuffer.putLong((Long) data);
    } else if (data instanceof Float) {
        eventDataBuffer.putFloat((Float) data);
    } else if (data instanceof Double) {
        eventDataBuffer.putDouble((Double) data);
    } else if (data instanceof Boolean) {
        eventDataBuffer.put((byte) (((Boolean) data) ? 1 : 0));
    } else if (data == null){
        //put nothing into he Buffer
    } else {
        eventDataBuffer.putInt(0);
    }
}

public static String getString(ByteBuf byteBuf, int size) throws UnsupportedEncodingException {
    byte[] bytes = new byte[size];
    byteBuf.readBytes(bytes);
    return new String(bytes, Charset.defaultCharset());
}

public static String getString(ByteBuffer byteBuf, int size) throws UnsupportedEncodingException {
    byte[] bytes = new byte[size];
    byteBuf.get(bytes);
    return new String(bytes, Charset.defaultCharset());
}

}

SiddhiEventConverter.java для обработки данных о событиях на пассивном узле

static Object[] toObjectArray(ByteBuffer byteBuffer,
                              String[] attributeTypeOrder) throws UnsupportedEncodingException {
    if (attributeTypeOrder != null) {
        Object[] objects = new Object[attributeTypeOrder.length];
        for (int i = 0; i < attributeTypeOrder.length; i++) {
            switch (attributeTypeOrder[i]) {
                case "INT":
                    objects[i] = byteBuffer.getInt();
                    break;
                case "LONG":
                    objects[i] = byteBuffer.getLong();
                    break;
                case "STRING":
                    int stringSize = byteBuffer.getInt();
                    if (stringSize == 0) {
                        objects[i] = null;
                    } else {
                        objects[i] = BinaryMessageConverterUtil.getString(byteBuffer, stringSize);
                    }
                    break;
                case "DOUBLE":
                    objects[i] = byteBuffer.getDouble();
                    break;
                case "FLOAT":
                    objects[i] = byteBuffer.getFloat();
                    break;
                case "BOOL":
                    objects[i] = byteBuffer.get() == 1;
                    break;
                case "OBJECT":
                    //for NULL fields
                    objects[i] = null;
                    break;
                default:
                    // will not occur
            }
        }
        return objects;
    } else {
        return null;
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...