Управление состоянием с огромным использованием памяти - запросы из хранилища - PullRequest
2 голосов
/ 03 февраля 2020

Извинения, если это звучит глупо! Мы работаем с flink для выполнения асинхронных вызовов c IO. В большинстве случаев вызовы ввода-вывода повторяются (один и тот же набор параметров), и около 80% API, которые мы вызываем, возвращают один и тот же ответ для тех же параметров. Итак, мы хотели бы избежать повторных звонков. Мы подумали, что можем использовать состояние для хранения предыдущих ответов и использовать их снова. Проблема в том, что, хотя наши ответы могут использоваться снова, количество таких ответов огромно и поэтому требует много памяти. Есть ли способ сохранить это, чтобы вести и запрашивать, как и когда требуется?

1 Ответ

2 голосов
/ 04 февраля 2020

Совсем не глупый вопрос!

Несколько фактов показывают, почему это не так просто:

  1. Состояние Flink строго локально для одного оператора. Вы не можете получить доступ к состоянию в другом операторе.
  2. Flink предлагает один внутренний сервер состояний, который может пролиться на диск, а именно RocksDB. В RocksDB хранится только ключевое состояние - неключевое состояние всегда находится в куче.
  3. Оператор asyn c i / o нельзя использовать в потоке с ключами - он работает только в контекст с ключом.
  4. Использование итераций (циклические c соединения в графе заданий) с API DataStream - очень плохая идея (потому что это нарушает контрольные точки).

Конечно , может быть необязательно, чтобы кэш находился в управляемом состоянии Флинка.

Некоторые параметры:

  • Не использовать ключевое состояние для кеша. Вы можете использовать что-то вроде отдельного экземпляра RocksDB для кеша и реализовать кеширование непосредственно в операторе asyn c i / o. Если бы кэш уместился в памяти, я бы предложил Guava.
  • Не используйте asyn c ввод / вывод. Извлекайте и кэшируйте себя в ProcessFunction, как предложено @ YuvalItzchakov.
  • Вместо этого вы можете использовать Stateful Functions . Это новая библиотека и API, которые расположены поверх Flink и преодолевают некоторые ограничения, перечисленные выше.
  • Вы можете построить что-то похожее на диаграмму ниже. Здесь кэш находится в ключевом состоянии в CoProcessFunction. Если кеш отсутствует, для извлечения недостающих данных используется нисходящий асинхронный оператор ввода / вывода c. Затем его необходимо вернуть в кэш с использованием внешней очереди, такой как Kafka, Kinesis или Pulsar.
                    +---------------------+                                       +------+
                    |                     +--results from cache+---------------^--> SINK |
+--requests+------> |  CoProcessFunction  |                                    |  +------+
                    |                     |                                    |
+--cache misses+--> |  cache in RocksDB   |                    +-----------+   |
                    |                     +--side output:      | fetch via +---+-> loop back
     SOURCES        +---------------------+  cache misses+---> | async i/o |       as 2nd input
                                                               +-----------+       to fill cache
...