Как запросить состояние запроса Flink - PullRequest
0 голосов
/ 25 июня 2019

Я использую Flink 1.8.0 и пытаюсь запросить состояние своей работы.

val descriptor = new ValueStateDescriptor("myState", Types.CASE_CLASS[Foo])
    descriptor.setQueryable("my-queryable-State")

Я использовал порт 9067, который является портом по умолчанию в соответствии с этим , моим клиентом:

val client = new QueryableStateClient("127.0.0.1", 9067)
val jobId = JobID.fromHexString("d48a6c980d1a147e0622565700158d9e")

      val execConfig = new ExecutionConfig
       val descriptor = new ValueStateDescriptor("my-queryable-State", Types.CASE_CLASS[Foo])
      val res: Future[ValueState[Foo]] = client.getKvState(jobId, "my-queryable-State","a", BasicTypeInfo.STRING_TYPE_INFO, descriptor)
      res.map(_.toString).pipeTo(sender)

но я получаю:

[ERROR] [06/25/2019 20:37:05.499] [bvAkkaHttpServer-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(bvAkkaHttpServer)] Error during processing of request: 'org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:9067
  1. что я делаю не так?
  2. как и где я должен определить QueryableStateOptions

1 Ответ

0 голосов
/ 26 июня 2019

Так что, если вы хотите использовать QueryableState, вам нужно добавить подходящий кувшин в свой флинк. Баночка flink-queryable-state-runtime, ее можно найти в папке opt в вашем дистрибутиве Flink, и вы должны переместить ее в папку lib.

Что касается второго вопроса, QueryableStateOption - это просто класс, который используется для создания статических определений ConfigOption. Затем определения используются для чтения конфигураций из файла flink-conf.yaml. Так что в настоящее время единственная возможность настроить QueryableState - это использовать файл flink-conf в дистрибутиве flink.

РЕДАКТИРОВАТЬ: Кроме того, попробуйте прочитать это] 1 он предоставляет дополнительную информацию о том, как работает Queryable State. Вам не следует подключаться напрямую к порту сервера, а следует использовать прокси-порт, который по умолчанию равен 9069.

.
...