Водяные знаки в RichParallelSourceFunction - PullRequest
0 голосов
/ 19 октября 2018

Я реализую функцию SourceFunction, которая считывает данные из базы данных.Задание должно быть в состоянии возобновиться, если оно остановлено или уничтожено (т. Е. Точки сохранения и контрольные точки) с данными, обрабатываемыми ровно один раз.

Что у меня есть:

@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends 
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{

    @transient var client: PostGreClient = _
    @volatile var isRunning: Boolean = true
    val DEFAULT_WAIT_TIME_MS = 1000

    def this(clientConfig: Serializable) =
        this(clientConfig, DEFAULT_WAIT_TIME_MS)

    override def stop(): Unit = {
        this.isRunning = false
    }

    override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        client = new JDBCClient
    }

    override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {

        while (isRunning){
           val statement = client.getConnection.createStatement()
           val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")

            while (resultSet.next()) {
                val event: String = resultSet.getString("name")
                val timestamp: Long = resultSet.getLong("timestamp")

                ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)

            }
        }
    }

    override def cancel(): Unit = {
        isRunning = false
    }
}

Как я могуУдостоверьтесь, чтобы получить только те строки базы данных, которые еще не обработаны?Я предполагал, что переменная ctx будет иметь некоторую информацию о текущем водяном знаке, чтобы я мог изменить свой запрос на что-то вроде:

select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark

Но у него нет подходящих для меня методов.Любые идеи, как решить эту проблему, будут оценены

1 Ответ

0 голосов
/ 19 октября 2018

Вы должны реализовать CheckpointedFunction , чтобы вы могли самостоятельно управлять контрольными точками.Документация интерфейса довольно обширна, но если вам нужен пример, я советую вам взглянуть на пример .

По сути, ваша функция должна реализовывать CheckpointedFunction#snapshotState для храненияукажите состояние, в котором вы нуждаетесь, используя управляемое состояние Flink, а затем при выполнении восстановления оно прочитает то же состояние в CheckpointedFunction#initializeState.

...