Буферизация преобразованных сообщений (например, 1000 отсчетов) с использованием обработки потока Apache Flink - PullRequest
1 голос
/ 07 ноября 2019

Я использую Apache Flink для обработки потока.

После подписки сообщений из источника (например, Kafka, AWS Kinesis Data Streams), а затем применения преобразования, агрегирования и т. Д. С использованием операторов Flink для потоковой передачи данных. Я хочу буферизовать окончательные сообщения (например, по 1000) и публиковать каждый пакет в одном запросе к внешнему API REST.

Как реализовать механизм буферизации (создание каждой 1000 записей в виде пакета) в Apache Flink?

Flink pipileine: Потоковый источник -> преобразование / уменьшение с помощью операторов -> буфер 1000 сообщений -> публикация в REST API

Благодарим вас за помощь!

1 Ответ

0 голосов
/ 07 ноября 2019

Я бы создал приемник с состоянием, который бы удерживал передаваемые сообщения. Когда счет становится достаточно высоким (1000), приемник отправляет пакет. Состояние может находиться в памяти (например, переменная экземпляра, содержащая ArrayList сообщений), но вы должны использовать контрольные точки, чтобы вы могли восстановить это состояние в случае какого-либо сбоя.

Когда ваш приемник имеет контрольную точкусостояние, он должен реализовать функцию CheckpointedFunction (в org.apache.flink.streaming.api.checkpoint), что означает, что вам нужно добавить два метода в ваш приемник:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {

    checkpointedState.clear();

    // HttpSinkStateItem is a user-written class 
    // that just holds a collection of messages (Strings, in this case)
    //
    // Buffer is declared as ArrayList<String>

    checkpointedState.add(new HttpSinkStateItem(buffer));

}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    // Mix and match different kinds of states as needed:
    //   - Use context.getOperatorStateStore() to get basic (non-keyed) operator state
    //        - types are list and union        
    //   - Use context.getKeyedStateStore() to get state for the current key (only for processing keyed streams)
    //        - types are value, list, reducing, aggregating and map
    //   - Distinguish between state data using state name (e.g. "HttpSink-State")      

    ListStateDescriptor<HttpSinkStateItem> descriptor =
        new ListStateDescriptor<>(
            "HttpSink-State",
            HttpSinkStateItem.class);

    checkpointedState = context.getOperatorStateStore().getListState(descriptor);

    if (context.isRestored()) {

        for (HttpSinkStateItem item: checkpointedState.get()) {
            buffer = new ArrayList<>(item.getPending());  
        }

    }       

}

Вы также можете использовать таймер вприемник (если входной поток имеет ключ / раздел) для периодической отправки, если количество не достигает вашего порога.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...