Flink ValueState "Ошибка при добавлении данных в RocksDB" - PullRequest
0 голосов
/ 12 октября 2019

Когда я хочу обновить состояние значения (queueState.update (queue)), перехватите это исключение:

org.apache.flink.util.FlinkRuntimeException: Error while adding data to RocksDB
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
    at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:37)
    at xxx.xxx.xxx.CleanTimedOutPartialMatches.processElement(CleanTimedOutPartialMatches.java:22)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
2019-10-13 11:06:29,311 WARN  org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor  - Timestamp monotony violated: 1570948458514 < 1570948663062
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performDefaultAction(StreamTask.java:276)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:300)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
    at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
    at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
    ... 11 more

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Comparator;
import java.util.PriorityQueue;

public class CleanTimedOutPartialMatches extends KeyedProcessFunction<String, ObjectNode, ObjectNode> {

    private static Logger LOGGER = LoggerFactory.getLogger(CleanTimedOutPartialMatches.class);

    private ValueState<PriorityQueue<JsonNode>> queueState = null;

    @Override
    public void processElement(ObjectNode log, Context context, Collector<ObjectNode> collector) throws Exception {
       try {
           if (context.timestamp() > context.timerService().currentWatermark()) {
               PriorityQueue<JsonNode> queue = queueState.value();
               if (queue == null) {
                   queue = new PriorityQueue<JsonNode>(Comparator.comparingLong(o -> o.get(TS).longValue()));
               }
               queue.add(log);
               queueState.update(queue);
               context.timerService().registerEventTimeTimer(log.get(TS).longValue());
           }
       } catch (Exception e){
           e.printStackTrace();
       }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ObjectNode> out) throws Exception {
        try {
            sendToSink(queueState.value(), ctx, out);
        } catch (Exception e){
            for(StackTraceElement el : e.getStackTrace()){
                LOGGER.info("{}.{}:{}", el.getClassName(), el.getMethodName(), el.getLineNumber());
            }
        }
    }

    private void sendToSink(PriorityQueue<JsonNode> queue, OnTimerContext context, Collector<ObjectNode> out){
        long watermark = context.timerService().currentWatermark();
        JsonNode lastSentLog = null;
        JsonNode log = queue.peek();
        while (log != null && log.get(TS).longValue() <= watermark) {
            if(lastSentLog != null && extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && log.get(TS).longValue() ==  lastSentLog.get(TS).longValue()){
                LOGGER.info("duplicated log removed");
            } else {
                if(lastSentLog != null){
                    long gapTime = Math.abs(log.get(TS).longValue() - lastSentLog.get(TS).longValue()) / 1000;
                    boolean isSameAttempt = (extractLogEndpoint(lastSentLog).equals(AUTOCOMPLETE) && extractLogEndpoint(log).equals(LOG))
                            || (extractLogEndpoint(log).equals(extractLogEndpoint(lastSentLog)) && gapTime  < MAX_TIME_GAP);
                    if(isSameAttempt){
                        ((ObjectNode)log).put(ATTEMPT_ID, lastSentLog.get(ATTEMPT_ID).textValue());
                    }
                }
                lastSentLog = log;
                out.collect((ObjectNode)log);
            }
            queue.remove(log);
            log = queue.peek();
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<PriorityQueue<JsonNode>> descriptor = new ValueStateDescriptor<>(
                // state name
                "sort-partial-matches",
                // type information of state
                TypeInformation.of(new TypeHint<PriorityQueue<JsonNode>>() {
                }));
        queueState = getRuntimeContext().getState(descriptor);
    }
}


1 Ответ

1 голос
/ 13 октября 2019

Одна проблема: похоже, что вы забыли позвонить queueState.update(queue) после того, как вы закончили удалять вещи из очереди.

Даже если вы все же добились этого, сортировка на основе PriorityQueue с RocksDB в качестве состоянияbackend будет работать очень плохо, так как при каждом доступе и обновлении он будет проходить через ser / de всей очереди. Рекомендуется использовать MapState для сортировки, если только вы не используете один из бэкэндов состояний на основе кучи, поскольку для этого нужно выполнять ser / de только для отдельных записей, а не для всей карты. Вы можете использовать временные метки в качестве ключей для MapState и список объектов в качестве значений. Используйте таймеры так же, как вы делаете сейчас, чтобы вызвать сброс содержимого списка.

Или вы можете использовать SQL для сортировки - см. ответ на этот вопрос для примера,

...