Ниже приведен код для обновления счетчика.
kvList contains List[(key, value to add)]
def counter(kvList: List[(String, Long)]) = {
try {`enter code here`
if (kvList!=null && kvList.nonEmpty) {
Observable.from(kvList).flatMap(kv => {
val bucketObservableAsync =
rx.lang.scala.JavaConversions.toScalaObservable(
bucket.async().counter(kv._1, kv._2, kv._2, ttl)
)
def errorOccurred(t: Throwable): Unit = {
log.error("ReportingService observable error occurred", t)
}
bucketObservableAsync.doOnError(errorOccurred)
bucketObservableAsync
}).last.toBlocking.single
} else {
log.info("ReportingService no keys to update")
}
} catch {
case e: Throwable =>
log.error("ReportingService error occurred ", e)
}
}
Я использую couchbase java-client-2.1.3 и вижу здесь 2 проблемы:
The errorOccurred is not getting called
Вместо (1) я иногда получаю следующее исключение:
ReportingService error occurred
com.couchbase.client.core.CouchbaseException: NOT_EXISTS
at com.couchbase.client.java.CouchbaseAsyncBucket$26.call(CouchbaseAsyncBucket.java:950)
at com.couchbase.client.java.CouchbaseAsyncBucket$26.call(CouchbaseAsyncBucket.java:932)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:105)
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)