Hazelcast-jet: ошибка при обогащении потока с помощью прямого поиска - PullRequest
1 голос
/ 07 октября 2019

Я слежу за Док , чтобы попытаться обогатить неограниченный поток путем прямого просмотра из IMap. У меня есть две карты:

  1. Продукт: Map<String, Product> (ProductId в качестве ключа)
  2. Продавец: Map<String, Seller> (SellerId в качестве ключа)

ОбаProduct и Seller - это очень простые классы:

public class Product implements DataSerializable {
    String productId;
    String sellerId;
    int price;
...
public class Seller implements DataSerializable {
    String sellerId;
    int revenue;
...

У меня есть два генератора данных, которые постоянно отправляют данные на две карты. Журнал событий включен для обеих карт. Я убедился, что журнал событий работает нормально.

Я хочу дополнить событие потока карты Product картой Seller. Вот фрагмент моего кода:

IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
            .withoutTimestamps()
            .groupingKey(Product::getSellerId)
            .mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
            .drainTo(getSink());
try {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
        jobConfig.setName(Constants.BASIC_TASK);
        Job job = jetClient.newJob(p, jobConfig);
    } finally {
        jetClient.shutdown();
    }

Когда работа была отправлена, я получил следующую ошибку:

com.hazelcast.spi.impl.operationservice.impl.Invocation -[172.31.33.212]: 80 [jet] [3.1] Не удалось асинхронное выполнение обратного вызова выполнения: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407 для вызова Invocation {op = com.hazelcast.map.impl.operation.GetOperation {serviceName = 'hz: impl: mapService', identityHash = 1939050026, partitionId = 70, replicaIndex = 0, callId = -37944, invocationTime = 1570410704479 (2019-10-07 01: 11: 44.479), waitTimeout = -1, callTimeout =60000, name = sellerMap}, tryCount = 250, tryPauseMillis = 500, invokeCount = 1, callTimeoutMillis = 60000, firstInvocationTimeMs = 1570410704479, firstInvocationTime = '2019-10-07 01: 11: 44.479', lastHeartbeatMillis = 1970T = 1970, 0, last-01-01 00: 00: 00.000 ', target = [172.31.33.212]: 80, pendingResponse = {VOID}, backupsAcksExpected = 0, backupsAcksReceived = 0, connection = null}

Я пыталсяпоставить одинДва экземпляра в моем кластере и получили одно и то же сообщение об ошибке. Я не мог понять, в чем причина.

1 Ответ

1 голос
/ 14 октября 2019

Кажется, что ваша проблема - ClassNotFoundException, даже если вы добавили в работу соответствующие классы. Объекты, которые вы храните в IMap, существуют независимо от вашего задания Jet, и когда источник журнала событий запрашивает их, код Jetap IMap пытается их десериализовать и завершается неудачей, поскольку у Jet нет классов вашей доменной модели в своем пути к классам.

Чтобы двигаться дальше, добавьте JAR с классами, которые вы используете в IMap, в путь к классам Jet. Мы ищем решение, которое сняло бы это требование.

Причина, по которой вы не получили трассировку стека исключений в выходных данных журнала, связана с настройкой по умолчанию java.util.logging, с которой вы сталкиваетесь, когда вы этого не делаетеявно добавьте более гибкий модуль ведения журнала, такой как Log4j.

Следующая версия упаковки Jet улучшит этот аспект. До этого времени вы можете выполнять следующие действия:

  1. Перейдите в каталог lib дистрибутива Jet и загрузите в него Log4j:

    $ cd lib
    $ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
    
  2. Редактировать bin/common.sh, чтобы добавить модуль в путь к классам. В конце файла есть строка

    CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
    

    . Вы можете продублировать эту строку и заменить hazelcast-jet-3.1 на log4j-1.2.17.

  3. В концеcommons.sh есть многострочная команда, которая создает переменную JAVA_OPTS. Добавьте "-Dhazelcast.logging.type=log4j" и "-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties" в список.

  4. Создайте файл log4j.properties в каталоге config:

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n

# Change this level to debug to diagnose failed cluster formation:
log4j.logger.com.hazelcast.internal.cluster=info

log4j.logger.com.hazelcast.jet=info
log4j.rootLogger=info, stdout
...