Я новичок в vert.x framework и rx-java.Я работаю с MongoDB в настоящее время.Проблема в том, что мне приходится обновлять один и тот же объект несколько раз, однако выполняется только последнее обновление.Код указан ниже.Метод findByProductId возвращает требуемый объект.проверка выполнена, поскольку она может вернуть false, если объект с этим идентификатором еще не находится в БД.
fun cancelProductStateCountList(inventoryList: List<Inventory>) : Observable<List<ProductState>> {
return Observable.create({ event ->
val count = inventoryList.count()
var counter = 0
var itemCounter = 0
val result = mutableListOf<ProductState>()
var searchingId = ""
Observable
.fromArray(inventoryList)
.flatMapIterable({
items -> items
})
.flatMap({
item ->
searchingId = item.productId!!
findByProductId(item.productId!!)
})
.flatMap ({
if(it is ProductState){
if(inventoryList[itemCounter].operation == InventoryOperation.HOLD.value()){
var quantityAvailable: Double = it.quantity!!
var quantityHold: Double = it.hold!!
quantityAvailable += inventoryList[itemCounter].quantity!!
quantityHold -= inventoryList[itemCounter].quantity!!
it.quantity = quantityAvailable
it.hold = quantityHold
itemCounter ++
updateWithoutDuplicate(it)
} else {
var quantityAvailable: Double = it.quantity!!
quantityAvailable += inventoryList[itemCounter].quantity!!
it.quantity = quantityAvailable
itemCounter ++
updateWithoutDuplicate(it)
}
} else {
Observable.just("")
}
})
.subscribe({
if(it is ProductState){
result.add(it)
counter ++
if (counter == count) {
event.onNext(result)
}
} else {
event.onError(NotExistsException("productId", searchingId))
}
}, {
event.onError(it)
})
})
}