Приостановка обработки на Flink KeyedStream - PullRequest
0 голосов
/ 27 октября 2018

У меня есть потоковое приложение Flink, которому требуется возможность «приостановить» и «отменить» обработку определенного потока с ключами.«Обработка» означает просто выполнение некоторого простого обнаружения аномалий в потоке.

Поток, о котором мы думаем, работает следующим образом:

Поток команд, либо ProcessCommand, PauseCommand, либоResumeCommand, каждый с id, который используется для KeyBy.

ProcessCommands, проверит, приостановлена ​​ли клавиша перед обработкой, и буфер, если нет.

PauseCommands приостановит обработку ключа.

ResumeCommands приостановит обработку ключа и очистит буфер.

Кажется ли этот поток разумным, и если да, я смогу использовать что-то вроде splitоператор для достижения?

Пример потока, пропущены отдельные отметки времени записи:

[{command: process, id: 1}, {command: pause, id: 1}, {command: process, id: 1}, {command: resume, id: 1}, {command: process, id: 1}]

Flow:
=>
{command: process, id: 1} # Sent downstream for analysis
=> 
{command: pause, id: 1} # Starts the buffer for id 1
=>
{command: process, id: 1} # Buffered into another output stream
=> 
{command: resume, id: 1} # Turns off buffering, flushes [{command: process, id: 1}] downstream
=>
{command: process, id: 1} # Sent downstream for processing as the buffering has been toggled off 

1 Ответ

0 голосов
/ 29 октября 2018

Этого можно достичь, используя Оператор Flink's Window . Во-первых, создайте поток на основе POJO или tuple, применив операцию map.

Затем, согласно вашим потребностям, вы можете использовать keyBy в этом потоке, чтобы получить keyedStream.

Теперь, используя комбинацию бесконечного window, a trigger и window function, основанного на времени, вы можете добиться переключения вашего командного потока.

Как правило, вы можете использовать windows в качестве буфера, который после получения записи паузы удерживает записи процесса до тех пор, пока не будет получена запись возобновления. Вы будете писать собственный триггер, который высвобождает окно (буфер) в соответствии с вашим сценарием.

Ниже приведена пользовательская реализация Trigger с onElement() переопределенным методом.

/**
 * We trigger the window processing as per command inside the record. The
 * process records are buffered when a pause record is received and the
 * buffer is evicted once resume record is received. If no pause record is
 * received earlier, then for each process record the buffer is evicted.
 */
@Override
public TriggerResult onElement(Tuple2<Integer, String> element, long timestamp, Window window,
        TriggerContext context) throws Exception {
    if (element.f1.equals("pause")) {
        paused = true;
        return TriggerResult.CONTINUE;
    } else if (element.f1.equals("resume")) {
        paused = false;
        return TriggerResult.FIRE_AND_PURGE;
    } else if (paused) // paused is a ValueState per keyed stream.
        return TriggerResult.CONTINUE;
    return TriggerResult.FIRE_AND_PURGE;
}

Ознакомьтесь с полным рабочим примером в этом репозитории github

...