Состояние флинка пустое (переинициализировано) после повторного запуска - PullRequest
0 голосов
/ 07 мая 2020

Я пытаюсь соединить два потока, первый сохраняется в MapValueState: RocksDB сохраняет данные в папке контрольной точки, но после нового запуска state пуст. Я запускаю его локально и в кластере flink с отменой отправки в кластере и просто перезапускаю локально

 env.setStateBackend(new RocksDBStateBackend(..)
 env.enableCheckpointing(1000)
 ...

   val productDescriptionStream: KeyedStream[ProductDescription, String] = env.addSource(..)
  .keyBy(_.id)

 val productStockStream: KeyedStream[ProductStock, String] = env.addSource(..)
    .keyBy(_.id)

и

  productDescriptionStream
  .connect(productStockStream)
  .process(ProductProcessor())
  .setParallelism(1)

env.execute("Product aggregator")

ProductProcessor

case class ProductProcessor() extends CoProcessFunction[ProductDescription, ProductStock, Product]{
private[this] lazy val stateDescriptor: MapStateDescriptor[String, ProductDescription] =
new MapStateDescriptor[String, ProductDescription](
  "productDescription",
  createTypeInformation[String],
  createTypeInformation[ProductDescription]
)
private[this] lazy val states: MapState[String, ProductDescription] = getRuntimeContext.getMapState(stateDescriptor)

override def processElement1(value: ProductDescription,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context,out: Collector[Product]
 ): Unit = {
  states.put(value.id, value)
 }}

 override def processElement2(value: ProductStock,
ctx: CoProcessFunction[ProductDescription, ProductStock, Product]#Context, out: Collector[Product]
 ): Unit = {
  if (states.contains(value.id)) {
         val product =Product(
          id = value.id,
          description = Some(states.get(value.id).description),
          stock = Some(value.stock),
          updatedAt = value.updatedAt)
        out.collect(product )
 }}

1 Ответ

2 голосов
/ 07 мая 2020

Контрольные точки создаются Flink для восстановления после сбоев, а не для возобновления работы после выключения вручную. Когда задание отменяется, по умолчанию Flink удаляет контрольные точки. Поскольку задание больше не может завершиться ошибкой, его не нужно восстанавливать.

У вас есть несколько вариантов:

(1) Настройте контрольную точку на сохранение контрольных точек при задание отменено:

CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(
  CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Затем, когда вы перезапустите задание, вам нужно будет указать, что вы хотите перезапустить его с указанной c контрольной точки:

flink run -s <checkpoint-path> ...

В противном случае, когда вы запускаете задание, оно начнется с бэкэнда с пустым состоянием.

(2) Вместо отмены задания используйте остановить с точкой сохранения :

flink stop [-p targetDirectory] [-d] <jobID>

, после чего вам снова нужно будет использовать flink run -s ..., чтобы возобновить работу с точки сохранения.

Остановка с помощью точки сохранения - более чистый подход, чем полагаться на недавнюю контрольную точку, к которой можно вернуться.

( 3) Или вы можете использовать Ververica Platform Community Edition , который поднимает уровень абстракции до такой степени, что вам не нужно самостоятельно управлять этими деталями.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...