Netty NioEventLoopGroup вызывает OutOfmemory в Vertx 3.x - PullRequest
1 голос
/ 05 июня 2019

Я использую Vertx 3.6.3 и Kafka в своем приложении, я развертываю свою вертикалку в кластер, но мое приложение часто падает, и я анализирую дамп кучи и получаю ошибку, как показано на прикрепленном изображении.enter image description here

Создано много объектов Netty NioEventLoopGroup.Это ошибка vertx или ошибка моего кода?Может ли кто-нибудь объяснить мне, как Vertx использует Netty и почему возникает эта ошибка?

Обновлено:

Я поделюсь некоторыми деталями исходного кода в моем проекте, как показано ниже

public class Application {

    private JsonObject config;

    public Application() {
    }

    // Getter, setter

}


public class BaseVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LogManager.getLogger(BaseVerticle.class);

    /**
     * Load config from properties.
     *
     * @return
     */
    protected Future<Application> loadConfig(Application application) {
        Future future = Future.future();

        ConfigStoreOptions file = new ConfigStoreOptions()
        .setType("file")
        .setFormat("properties")
        .setConfig(new JsonObject().put("path", "application.properties"));

        ConfigStoreOptions env = new ConfigStoreOptions().setType("env");
        ConfigStoreOptions sys = new ConfigStoreOptions().setType("sys");

        ConfigRetrieverOptions options = new ConfigRetrieverOptions()
            .addStore(file).addStore(env).addStore(sys);

        ConfigRetriever retriever = ConfigRetriever.create(vertx, options);
        retriever.getConfig(json -> {
            if (json.failed()) {
                LOGGER.error("Failed to load configuration. Reason: " + json.cause().getMessage());
                // Failed to retrieve the configuration
                json.cause().printStackTrace();
                future.fail(json.cause());
            } else {
                LOGGER.info("Load configuration success.");
                JsonObject config = json.result();
                future.complete(application.setConfig(config));
            }
        });

        return future;
    }
}

public class MainVerticle extends BaseVerticle {

  private static final Logger LOGGER = LogManager.getLogger(MainVerticle.class);

  @Override
  public void start(Future<Void> startFuture) throws Exception {
    doStart(startFuture);
  }

  private void doStart(Future<Void> startFuture) {
        vertx.exceptionHandler(event -> LOGGER.error( " throws exception: {}", event.getMessage(), event));
        LOGGER.info("vertx.isClustered() = {}", vertx.isClustered());
        Application application = new Application();

        loadConfig(application)
        .compose(this::deployProcessingVerticle)
        .setHandler(r -> {
            if(r.succeeded()) {
                LOGGER.info("Deploy {} success.", getClass().getSimpleName());
                startFuture.complete();
            } else {
                LOGGER.info("Deploy {} failed.", getClass().getSimpleName());
                startFuture.fail(r.cause());
            }
        });

    }

    private Future<Application> deployProcessingVerticle(Application application) {
        Future<Application> future = Future.future();

        JsonObject configuration = application.getConfig();

        int WORKER_POOL_SIZE = configuration.getInteger("http.workerPoolSize");
        DeploymentOptions opts = new DeploymentOptions()
                .setHa(true)
                .setWorker(true)
                .setInstances(1)
                .setWorkerPoolSize(WORKER_POOL_SIZE)
                .setWorkerPoolName("processing")
                .setConfig(configuration);

        vertx.deployVerticle(ProcessingVerticle.class, opts, res -> {
            if (res.failed()) {
                future.fail(res.cause());
                LOGGER.error("Deploy ProcessingVerticle failed. Reason: {}", res.cause().getMessage(), res.cause());
            } else {
                future.complete(application);
                LOGGER.info("Deploy ProcessingVerticle success.");
            }
        });

        return future;
    }

    public static void main(String[] args) {
        Vertx.clusteredVertx(new VertxOptions().setHAEnabled(true), 
                vertx -> vertx.result().deployVerticle(MainVerticle.class.getName(), new DeploymentOptions().setHa(true))
    );
  }
}

public class ProcessingVerticle extends AbstractVerticle {

    private static final Logger LOGGER = LogManager.getLogger(ProcessingVerticle.class);

    private KafkaHandler kafkaHandler;

    @Override
    public void start(Future<Void> startFuture) throws Exception {
        super.start(startFuture);
        kafkaHandler = new KafkaHandler(vertx, config(), startFuture);
    }

}

public class KafkaHandler{

    private static final Logger logger = LogManager.getLogger(KafkaHandler.class);

    private KafkaWriteStream<String, JsonObject> producer;
    private KafkaReadStream<String, JsonObject> consumer;
    private Vertx vertx;
    private JsonObject config;
    private Function<JsonObject, Void> processMessage1;
    private Function<JsonObject, Void> processMessage2;

    private String topic1;
    private String topic2;

    public KafkaHandler(Vertx vertx, JsonObject config, Future<Void> startFuture){
        this.vertx = vertx;
        this.config = config;
        initTopics(config);
        startKafka(startFuture);
    }

    private void startKafka(Future<Void> startFuture) {
        createProducer();
        createConsumer();
    }

    private void createProducer() {
        Properties config = new Properties();
        String server = this.config.getString("kafka.servers", "localhost:9092");
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
        config.put(ProducerConfig.LINGER_MS_CONFIG, 100);
        config.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, this.config.getString("kafka.request.timeout", "30000"));
        config.put(ProducerConfig.ACKS_CONFIG, "1");
        producer = KafkaWriteStream.create(vertx, config, String.class, JsonObject.class);
    }

    private void initTopics(JsonObject config) {
        topic1 = this.config.getString(...);
        topic2 = this.config.getString(...);
    }

    public void publishMessage(JsonObject message, String topic){
        producer.write(new ProducerRecord<>(topic, message), ar -> {
            if (ar.failed()){
                logger.error(ar.cause());
            }
        });
    }

    private void createConsumer() {
        Properties config = new Properties();
        String server = this.config.getString("kafka.servers", "localhost:9092");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.config.getString("kafka.offset.reset", "latest"));
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, this.config.getString("kafka.group.id"));
        consumer = KafkaReadStream.create(vertx, config, String.class, JsonObject.class);
    }

    private void processRecord(ConsumerRecord<String, JsonObject> record) {
        logger.info("Topic {} - Receive Message: {}", record.topic(), record.value().toString());

        if(record.topic().contains(topic1)){
            processMessage1.apply(record.value());
        }
        if(record.topic().contains(topic2)){
            processMessage2.apply(record.value());
        }
    }

    public void consumerSubscribe(List<Integer> coins){
        String[] arr = {topic1, topic2};
        String env = config.getString("env", "dev");
        List<String> listTopics = new ArrayList<>();
        for (String name : arr) {
            listTopics.add(name);
        }

        Set<String> topics = new HashSet<>(listTopics);
        consumer.subscribe(topics, ar -> {
            if (ar.succeeded()) {
                logger.info("Consumer subscribed");
                vertx.setPeriodic(1000, timerId -> {
                  consumer.poll(100, records -> {
                    if (records.succeeded()) {
                        records.result().forEach(record -> {
                            processRecord(record);
                        });
                    }
                  });
                });
            } else {
                logger.error(ar.cause());
            }
        });
    }

    @AfterClass
    public void stopKafka(){
        if (producer != null) {
            producer.close();
        }
        if (consumer != null) {
            consumer.close();
        }
    }


    // Getter, Setter
}

Ответы [ 2 ]

1 голос
/ 17 июня 2019

Вышеупомянутая проблема была решена, поэтому мы злоупотребили Vertx.setTimer, этот метод создает потоки и удерживает память, поэтому мы всегда получаем исключение «Недостаточно памяти»
ref: https://groups.google.com/forum/#!topic/vertx/K74PcXUauJM

0 голосов
/ 08 июня 2019

Vert.x использует Netty под капотом для сетевых задач.

Я очень сомневаюсь, что это ошибка в Vert.x или Netty, поскольку оба широко используются.Это может быть ошибка Vert.x Kafka Client , но также сомневается в этом, так как этот клиент в хорошем состоянии, и я не видел таких проблем в последнее время.

Большинствовозможно, это ошибка в вашем коде.Но без примеров это очень трудно сказать.

...