Я слежу за Док , чтобы попытаться обогатить неограниченный поток путем прямого просмотра из IMap. У меня есть две карты:
- Продукт:
Map<String, Product>
(ProductId в качестве ключа) - Продавец:
Map<String, Seller>
(SellerId в качестве ключа)
ОбаProduct
и Seller
- это очень простые классы:
public class Product implements DataSerializable {
String productId;
String sellerId;
int price;
...
public class Seller implements DataSerializable {
String sellerId;
int revenue;
...
У меня есть два генератора данных, которые постоянно отправляют данные на две карты. Журнал событий включен для обеих карт. Я убедился, что журнал событий работает нормально.
Я хочу дополнить событие потока карты Product
картой Seller
. Вот фрагмент моего кода:
IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
.withoutTimestamps()
.groupingKey(Product::getSellerId)
.mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
.drainTo(getSink());
try {
JobConfig jobConfig = new JobConfig();
jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
jobConfig.setName(Constants.BASIC_TASK);
Job job = jetClient.newJob(p, jobConfig);
} finally {
jetClient.shutdown();
}
Когда работа была отправлена, я получил следующую ошибку:
com.hazelcast.spi.impl.operationservice.impl.Invocation -[172.31.33.212]: 80 [jet] [3.1] Не удалось асинхронное выполнение обратного вызова выполнения: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407 для вызова Invocation {op = com.hazelcast.map.impl.operation.GetOperation {serviceName = 'hz: impl: mapService', identityHash = 1939050026, partitionId = 70, replicaIndex = 0, callId = -37944, invocationTime = 1570410704479 (2019-10-07 01: 11: 44.479), waitTimeout = -1, callTimeout =60000, name = sellerMap}, tryCount = 250, tryPauseMillis = 500, invokeCount = 1, callTimeoutMillis = 60000, firstInvocationTimeMs = 1570410704479, firstInvocationTime = '2019-10-07 01: 11: 44.479', lastHeartbeatMillis = 1970T = 1970, 0, last-01-01 00: 00: 00.000 ', target = [172.31.33.212]: 80, pendingResponse = {VOID}, backupsAcksExpected = 0, backupsAcksReceived = 0, connection = null}
Я пыталсяпоставить одинДва экземпляра в моем кластере и получили одно и то же сообщение об ошибке. Я не мог понять, в чем причина.