Мне удалось найти ответ на мою проблему. Основная проблема заключалась в том, что ES не предназначен для приема так мало данных, как генерируется в учебном проекте. ES по умолчанию записывает данные в пакетах по 1000 записей. В этом проекте я генерирую по одному данным каждые 30 секунд или по 1000 каждые 500 минут (или 8 часов 20 минут).
, поэтому я подробно рассмотрел конфигурацию своей топологии и поиграл со следующими параметрами:
- es.batch.size.entries: 1
- es.storm.bolt.flu sh .entries.size: 1
- topology.producer.batch. size: 1
- topology.transfer.batch.size: 1
А теперь это выглядит так:
...
...
public class App
{
...
...
public static void main( String[] args ) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException
{
...
...
StormTopology topology = topologyBuilder.createTopology(); // je crée ma topologie Storm
String topologyName = properties.getProperty("storm.topology.name"); // je nomme ma topologie
StormSubmitter.submitTopology(topologyName, getTopologyConfig(properties), topology); // je démarre ma topologie sur mon cluster storm
System.out.println( "Topology on remote cluster : Started!" );
}
private static Config getTopologyConfig(Properties properties)
{
Config stormConfig = new Config();
stormConfig.put("topology.workers", Integer.parseInt(properties.getProperty("topology.workers")));
stormConfig.put("topology.enable.message.timeouts", Boolean.parseBoolean(properties.getProperty("topology.enable.message.timeouts")));
stormConfig.put("topology.message.timeout.secs", Integer.parseInt(properties.getProperty("topology.message.timeout.secs")));
stormConfig.put("topology.transfer.batch.size", Integer.parseInt(properties.getProperty("topology.transfer.batch.size")));
stormConfig.put("topology.producer.batch.size", Integer.parseInt(properties.getProperty("topology.producer.batch.size")));
return stormConfig;
}
...
...
...
}
И теперь это работает !!!