Почему моя топология шторма не активируется, когда я отправляю кортеж в ElasticSearch? - PullRequest
0 голосов
/ 24 января 2020

Я новичок в использовании Storm, я только что начал учебный курс по Data Architect, и именно в этом контексте я сталкиваюсь с проблемой, которая приводит меня к вам сегодня.

Я получаю сообщения от kakfa через KafkaSpout с именем CurrentPriceSpout. Пока все работает. Затем в своем CurrentPriceBolt я повторно выпустил кортеж, чтобы мои данные записывались в ElasticSearch с использованием EsCurrentPriceBolt. Проблема здесь. Я не могу записать свои данные напрямую в ElasticSearch, они записываются только при удалении моей топологии.

Существует ли параметр Storm, который может форсировать запись кортежей путем получения подтверждений?

Я попытался, добавив параметр ".addConfiguration (Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 5)", кортежи хорошо написаны в ElasticSearch, но не подтверждены. Поэтому Storm переписывает их на неопределенный срок.

Спасибо за вашу помощь, Тьерри

1 Ответ

0 голосов
/ 28 января 2020

Мне удалось найти ответ на мою проблему. Основная проблема заключалась в том, что ES не предназначен для приема так мало данных, как генерируется в учебном проекте. ES по умолчанию записывает данные в пакетах по 1000 записей. В этом проекте я генерирую по одному данным каждые 30 секунд или по 1000 каждые 500 минут (или 8 часов 20 минут).

, поэтому я подробно рассмотрел конфигурацию своей топологии и поиграл со следующими параметрами:

  • es.batch.size.entries: 1
  • es.storm.bolt.flu sh .entries.size: 1
  • topology.producer.batch. size: 1
  • topology.transfer.batch.size: 1

А теперь это выглядит так:

...
...

public class App 
{
    ...    
    ...    

    public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
    {
        ...
        ...

        StormTopology topology  = topologyBuilder.createTopology();                 // je crée ma topologie Storm
        String topologyName     = properties.getProperty("storm.topology.name");    // je nomme ma topologie
        StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology);               // je démarre ma topologie sur mon cluster storm
        System.out.println( "Topology on remote cluster : Started!" );              
    }


    private static Config getTopologyConfig(Properties properties)
    {
        Config stormConfig = new Config();
        stormConfig.put("topology.workers",                 Integer.parseInt(properties.getProperty("topology.workers")));
        stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
        stormConfig.put("topology.message.timeout.secs",    Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
        stormConfig.put("topology.transfer.batch.size",     Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
        stormConfig.put("topology.producer.batch.size",     Integer.parseInt(properties.getProperty("topology.producer.batch.size")));      
        return stormConfig;
    }

    ...    
    ...    
    ...    
}

И теперь это работает !!!

...