Apache Flink: как использовать SourceFunction для выполнения задачи с заданным интервалом? - PullRequest
0 голосов
/ 23 октября 2018

Мне нужно выполнить задание Flink для извлечения записей из базы данных с заданным интервалом и архивирования после обработки.Я реализовал SourceFunction для получения необходимых записей из базы данных и добавил SourceFunction в качестве источника для StreamExecutionEnvironment.Как я могу указать, что StreamExecutionEnvironment должен извлекать записи из базы данных с использованием SourceFunction каждые 10 минут?

SourceFunction:

public class MongoDBSourceFunction implements SourceFunction<List<Book>>{

    public void cancel() {
        // TODO Auto-generated method stub
    }

    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {

        List<Book> books = getBooks();

        context.collect(books);

    }

    public List<Book> getBooks() {
        List<Book> books = new ArrayList<Book>();

        //fetch all books from database     
        return books;
    }

}

Процессор:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

    public static void main(String[] args) {

        final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new MongoDBSourceFunction()).print();
    }

}

1 Ответ

0 голосов
/ 24 октября 2018

Вам необходимо добавить эту функцию к самому MongoDBSourceFunction.Например, вы можете создать экземпляр ScheduledExecutorService в методе open и запланировать задачу чтения с помощью этого исполнителя.

Обратите внимание, что важно удерживать блокировку контрольной точки при отправке записей.

...