Почему моя топология Apache Storm 2.0 перезапускается через 30 секунд? - PullRequest
0 голосов
/ 02 октября 2019

Я пробовал с несколькими параметрами конфигурации и даже с использованием withLocalModeOverride, но не повезло. Что мне здесь не хватает?

Вот пример приложения, после 30 секунд счетчик сбрасывается и все запускается заново. Пожалуйста, дайте мне знать, если я могу предоставить дополнительную информацию.

Использование:

mvn package
java -jar target/Test-0.1.0.jar

src/main/java/Test.java:

package storm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

class Test {
    private static class Spout extends BaseRichSpout {
        private SpoutOutputCollector spoutOutputCollector;
        private long n;

        @Override
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }

        @Override
        public void nextTuple() {
            LOG.error("InfiniteSpout::nextTuple {}", n);
            spoutOutputCollector.emit(new Values(n++));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("x"));
        }
    }

    private static class Bolt extends BaseRichBolt {
        @Override
        public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

        @Override
        public void execute(Tuple tuple) {
            Long x = tuple.getLongByField("x");
            LOG.error("Bolt::execute {}", x);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) {
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new Spout());
            builder.setBolt("bolt", new Bolt()).shuffleGrouping("spout");

            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            StormTopology topology = builder.createTopology();
            cluster.submitTopology("test", conf, topology);
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }
    }
}

pom.xml:

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>Test</groupId>
    <artifactId>Test</artifactId>
    <version>0.1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.1.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>Test</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

1 Ответ

2 голосов
/ 02 октября 2019

Вот актуальная проблема:

Это https://issues.apache.org/jira/browse/STORM-3501. Эта проблема касается только запуска в LocalClusters, и только в том случае, если у вас нет каталога ресурсов в сгенерированном вами банке.

Вы можете обойти это, добавив каталог ресурсов в ваш jar. С вашим pom вы хотите добавить каталог src / main / resources / resources.

Что касается вещей, которые следует учитывать при запуске одноузлового Storm, я думаю, вам следует задуматься о том, является ли Storm правильным выбором для вашеговариант использования. Storm достаточно сложен, и большая часть сложности заключается в том, что мы хотим, чтобы он мог распределять вычисления по многим физическим машинам. Если вы собираетесь запускать все свои вычисления на одной машине, вы, возможно, не сильно выиграете, если будете использовать Storm, например, просто напишите обычное Java-приложение или что-то наподобие Apache Camel.

Другие вопросы, которые следует учитыватьпри работе с одним узлом:

  • Шторм не подвержен сбоям, поэтому, если вы получите какие-либо ошибки, произойдет сбой всего рабочего. Поскольку вы работаете на одной машине, вы можете отключить значительную часть вашего кластера (по умолчанию 4 рабочих на машину, поэтому вы потеряете четверть своего состояния в случае возникновения ошибки).

  • Не используйте LocalCluster для рабочих нагрузок, он не предназначен для этого. Настройте настоящую установку Storm, а затем просто запустите ее на одной машине.

Вот некоторые вещи, которые бросаются в глаза, может быть, некоторые из них помогут:

  • Вам нужно добавить спящий режим после cluster.submitTopology, или ваша программа должна просто выйти немедленно. Этот вызов не блокируется, он просто передает топологию в LocalCluster, а затем возвращает. Когда ваш основной метод завершает работу, LocalCluster, вероятно, также отключится. В «реальной» настройке вы будете отправлять в кластер, который выполняется как отдельный процесс, так что это не будет проблемой, но при использовании LocalCluster вам нужно заставить основной поток ждать, пока вы не захотите закрыть программу.

  • На случай, если вы в конечном итоге будете использовать подобный код в тесте, вы должны не забывать закрывать LocalCluster, когда закончите. Он может быть автозамечен, поэтому вы можете просто попробовать его.

  • Хорошей практикой является закрепление кортежей в болтах. Подумайте о расширении BaseBasicBolt, если вы просто хотите подтвердить, когда ваш болт сделан с кортежем.

...