Завершить запрос к базе данных во время операции flink - PullRequest
0 голосов
/ 17 мая 2018

Я пытаюсь работать с Флинком и Кассандрой. Обе эти среды очень параллельны, но мне сложно заставить их работать вместе.

Сейчас мне нужно выполнить операцию для параллельного чтения из Cassandra с разными диапазонами токенов с возможностью завершить запрос после того, как N объектов прочитают .

Пакетный режим мне подходит больше, но возможны и DataStreams. Я попытался LongCounter (см. Ниже), но он не будет работать, как я ожидал. Я не смог получить глобальную сумму с ними. Только локальные значения.

Асинхронный режим не является обязательным, поскольку эта операция CassandraRequester выполняется в параллельном контексте с распараллеливанием около 64 или 128.

Это моя попытка

class CassandraRequester<T> (val klass: Class<T>, private val context: FlinkCassandraContext):
        RichFlatMapFunction<CassandraTokenRange, T>() {

    companion object {
        private val session = ApplicationContext.session!!
        private var preparedStatement: PreparedStatement? = null
        private val manager = MappingManager(session)
        private var mapper: Mapper<*>? = null
        private val log = LoggerFactory.getLogger(CassandraRequesterStateless::class.java)

        public const val COUNTER_ROWS_NUMBER = "flink-cassandra-select-count"
    }

    private lateinit var counter: LongCounter

    override fun open(parameters: Configuration?) {
        super.open(parameters)

        if(preparedStatement == null)
            preparedStatement = session.prepare(context.prepareQuery()).setConsistencyLevel(ConsistencyLevel.LOCAL_ONE)
        if(mapper == null) {
            mapper = manager.mapper<T>(klass)
        }
        counter = runtimeContext.getLongCounter(COUNTER_ROWS_NUMBER)

    }

    override fun flatMap(tokenRange: CassandraTokenRange, collector: Collector<T>) {

        val bs = preparedStatement!!.bind(tokenRange.start, tokenRange.end)

        val rs = session.execute(bs)
        val resultSelect = mapper!!.map(rs)
        val iter = resultSelect.iterator()
        while (iter.hasNext()) when {
            this.context.maxRowsExtracted == 0L || counter.localValue < context.maxRowsExtracted -> {
                counter.add(1)
                collector.collect(iter.next() as T)
            }
            else -> {
                collector.close()
                return
            }
        }
    }

}

Можно ли прекратить запрос в таком случае?

...