Мне нужно выполнить задание Flink для извлечения записей из базы данных с заданным интервалом и архивирования после обработки.Я реализовал SourceFunction для получения необходимых записей из базы данных и добавил SourceFunction в качестве источника для StreamExecutionEnvironment.Как я могу указать, что StreamExecutionEnvironment должен извлекать записи из базы данных с использованием SourceFunction каждые 10 минут?
SourceFunction:
public class MongoDBSourceFunction implements SourceFunction<List<Book>>{
public void cancel() {
// TODO Auto-generated method stub
}
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {
List<Book> books = getBooks();
context.collect(books);
}
public List<Book> getBooks() {
List<Book> books = new ArrayList<Book>();
//fetch all books from database
return books;
}
}
Процессор:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ArchiveJob {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MongoDBSourceFunction()).print();
}
}