Flink Queryable State Error - PullRequest
       9

Flink Queryable State Error

0 голосов
/ 18 мая 2018

Я пытаюсь использовать запрашиваемое состояние на Flink (версия 1.4.2), но, к сожалению, я получаю следующую ошибку:

INFO  my.test.flink.QueryableState  - Params are a96438fa12879b7598c9cf32684e2669, kafka-cluster_jobmanager_1, 6123
INFO  my.test.flink.QueryableState  - Before the call java.util.concurrent.CompletableFuture@26aa12dd[Not completed]
java.util.concurrent.ExecutionException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        at my.test.flink.QueryableState.main(QueryableState.java:67)
Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
        at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1166)
        at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:619)
        at org.apache.flink.queryablestate.network.messages.MessageSerializer.deserializeHeader(MessageSerializer.java:231)
        at org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:76)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at java.lang.Thread.run(Thread.java:745)

На стороне клиента я использую flink-queryable-state-client-java_2_11.jar и соответствующая часть кода для запрашиваемого клиента:

QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);

TypeInformation<MyEvent> typeInformation = TypeInformation.of(new TypeHint<MyEvent>() {});
ListStateDescriptor<MyEvent> descriptor = new ListStateDescriptor<MyEvent>("myEvents",
                 typeInformation.createSerializer(new ExecutionConfig()));

CompletableFuture<ListState<MyEvent>> resultFuture =
                        client.getKvState(JobID.fromHexString(jobIdParam),"myEvents", "1", 
                        BasicTypeInfo.STRING_TYPE_INFO , descriptor );

logger.info("Before the call " + resultFuture);
try {
         logger.info("Finished"+ resultFuture.get());
 } catch(Exception ex) {
         ex.printStackTrace();
 }

Наконец, задание, запущенное на Flink, настроено для ListState, как показано ниже.Обратите внимание, что данные в ListState вводятся с помощью String

        TypeInformation<MyEvent> typeInformation = TypeInformation.of(new TypeHint<MyEvent>() {});
        ListStateDescriptor<MyEvent> eventState = 
                new ListStateDescriptor<MyEvent>("myEvents",typeInformation);
        eventState.setQueryable("myEvents");
        eventListState = getRuntimeContext().getListState(eventState);

Мне кажется, что это ошибка сериализации, но я не знаю, что мне нужно сделать, чтобы это исправить.У кого-нибудь есть идея, что может быть не так с кодом выше?Я что-то упустил?

1 Ответ

0 голосов
/ 18 мая 2018

Я столкнулся с той же самой проблемой при обновлении этого запрашиваемого демо состояния для Flink 1.4.Если я правильно помню, важная часть правильно работает с CompletableFuture - вы не можете просто вызвать get () сразу.

См. код для рабочего примера, ключевую частьиз которых выглядит примерно так:

try {

    CompletableFuture<FoldingState<BumpEvent, Long>> resultFuture =
      client.getKvState(jobId, EventCountJob.ITEM_COUNTS, key, 
      BasicTypeInfo.STRING_TYPE_INFO, countingState);

    resultFuture.thenAccept(response -> {
      try {
        Long count = response.get();
        // now we could do something with the value
      } catch (Exception e) {
        e.printStackTrace();
      }
    });

    resultFuture.get(5, TimeUnit.SECONDS);

} catch (Exception e) {
  e.printStackTrace();
}
...