Я пытаюсь работать с Флинком и Кассандрой. Обе эти среды очень параллельны, но мне сложно заставить их работать вместе.
Сейчас мне нужно выполнить операцию для параллельного чтения из Cassandra с разными диапазонами токенов с возможностью завершить запрос после того, как N объектов прочитают .
Пакетный режим мне подходит больше, но возможны и DataStreams.
Я попытался LongCounter (см. Ниже), но он не будет работать, как я ожидал. Я не смог получить глобальную сумму с ними. Только локальные значения.
Асинхронный режим не является обязательным, поскольку эта операция CassandraRequester выполняется в параллельном контексте с распараллеливанием около 64 или 128.
Это моя попытка
class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
RichFlatMapFunction<CassandraTokenRange, T>() {
companion object {
private val session = ApplicationContext.session!!
private var preparedStatement: PreparedStatement? = null
private val manager = MappingManager(session)
private var mapper: Mapper<*>? = null
private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)
public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
}
private lateinit var counter: LongCounter
override fun open(parameters: Configuration?) {
super.open(parameters)
if(preparedStatement == null)
preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
if(mapper == null) {
mapper = manager.mapper<T>(klass)
}
counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)
}
override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {
val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)
val rs = session.execute(bs)
val resultSelect = mapper!!.map(rs)
val iter = resultSelect.iterator()
while (iter.hasNext()) when {
this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
counter.add(1)
collector.collect(iter.next() as T)
}
else -> {
collector.close()
return
}
}
}
}
Можно ли прекратить запрос в таком случае?