Я реализую функцию SourceFunction, которая считывает данные из базы данных.Задание должно быть в состоянии возобновиться, если оно остановлено или уничтожено (т. Е. Точки сохранения и контрольные точки) с данными, обрабатываемыми ровно один раз.
Что у меня есть:
@SerialVersionUID(1L)
class JDBCSource(private val waitTimeMs: Long) extends
RichParallelSourceFunction[Event] with StoppableFunction with LazyLogging{
@transient var client: PostGreClient = _
@volatile var isRunning: Boolean = true
val DEFAULT_WAIT_TIME_MS = 1000
def this(clientConfig: Serializable) =
this(clientConfig, DEFAULT_WAIT_TIME_MS)
override def stop(): Unit = {
this.isRunning = false
}
override def open(parameters: Configuration): Unit = {
super.open(parameters)
client = new JDBCClient
}
override def run(ctx: SourceFunction.SourceContext[Event]): Unit = {
while (isRunning){
val statement = client.getConnection.createStatement()
val resultSet = statement.executeQuery("SELECT name, timestamp FROM MYTABLE")
while (resultSet.next()) {
val event: String = resultSet.getString("name")
val timestamp: Long = resultSet.getLong("timestamp")
ctx.collectWithTimestamp(new Event(name, timestamp), timestamp)
}
}
}
override def cancel(): Unit = {
isRunning = false
}
}
Как я могуУдостоверьтесь, чтобы получить только те строки базы данных, которые еще не обработаны?Я предполагал, что переменная ctx
будет иметь некоторую информацию о текущем водяном знаке, чтобы я мог изменить свой запрос на что-то вроде:
select name, timestamp from myTable where timestamp > ctx.getCurrentWaterMark
Но у него нет подходящих для меня методов.Любые идеи, как решить эту проблему, будут оценены