Внутренняя оптимизация Hazelcast Jet - PullRequest
0 голосов
/ 05 ноября 2018

Я проверяю Hazelcast Jet на предмет потребностей своего проекта, но я нашел документацию действительно расплывчатой ​​по следующим темам:

1) Когда я выполняю объединение данных в двух потоках списка ... как, например:

BatchStage<Trade> trades = p.drawFrom(list("trades"));
BatchStage<Entry<Integer, Broker>> brokers =    
p.drawFrom(list("brokers"));
BatchStage<Tuple2<Trade, Broker>> joined = trades.hashJoin(brokers,
    joinMapEntries(Trade::brokerId),
    Tuple2::tuple2);
joined.drainTo(Sinks.logger());

тогда я могу как-то сказать Джету, что соединение на самом деле произойдет? Или соединение на стороне карты или соединение на стороне уменьшения ...? Я имею в виду, просто представьте, что «брокеры» должны быть маленькими, а сделки - огромными. Оптимальным методом для объединения этих двух наборов является соединение на стороне карты или широковещательное соединение .... Какие данные будут переданы по сети, когда Jet выполнит соединение? Есть ли оптимизация по размеру?

2) Я тестировал следующий сценарий:

легкий конвейер:

private Pipeline createPipeLine() {
    Pipeline p = Pipeline.create();
    BatchStage stage = p.drawFrom(Sources.<Date>list("master"));
    stage.drainTo(Sinks.logger());
    return p;
}

list("master") постоянно заполняется другим узлом в кластере. Теперь, когда я отправляю этот конвейер в кластер, только часть списка ("master") сливается в логгер. Можно ли как-то настроить работу Jet на постоянный слив list("master") на стандартный вывод?

Заранее спасибо

1 Ответ

0 голосов
/ 05 ноября 2018
  1. От Javadoc of HashJoin:

    Практически преобразование hash-join оптимизировано для пропускной способности так что каждый член вычислительной системы имеет локальную копию всего обогащения данные, хранящиеся в хеш-таблицах (отсюда и название). Обогащающие потоки полностью израсходовано до поступления любых данных из первичного потока.

    Для вашего примера, все элементы из списка broker будут сначала потребляться всеми участниками, затем будет использоваться список trades.

  2. IList - это пакетный источник, вам нужен потоковый источник для постоянного потребления элементов. Вы можете использовать IQueue в качестве источника, вот простой способ создать источник для очереди:

    StreamSource<Trade> queueSource = SourceBuilder.<IQueue<Trade>>stream("queueStream", 
            c -> c.jetInstance().getHazelcastInstance().getQueue("trades"))
        .<Trade>fillBufferFn((queue, buf) -> buf.add(queue.poll()))
        .build();
    
...