activeMQ создает тысячи файлов в / proc - PullRequest
0 голосов
/ 27 марта 2020

У нас есть два брокера activeMQ, работающие в подпружиненной загрузке, один для MQTT, один для JMS. Существует верблюжий маршрут, который может передавать сообщения между ними, поскольку MQTT является конечной точкой publi c, в то время как JMS-брокер доступен только внутри сети, но, насколько я могу судить, в настоящее время верблюд, похоже, не участвует в создании проблема. Хотя с верблюдом я не совсем уверен ...

Проблема в том, что приложение успешно создает тонны файлов в / pro c, пока не будет достигнут системный лимит открытых файлов для процесса, и все рушится.

Я уже установил максимальное количество открытых файлов Ubuntu на максимум, но проблема усугубляется с ростом нагрузки.

Я могу запустить это команда (брокер - пользователь, управляющий приложением):

/usr/bin/lsof | grep broker

И то, что я получаю, выглядит примерно так (очевидно, сильно обрезано):

ActiveMQ  21482 32629     broker  cwd   unknown                                        /proc/21482/task/32629/cwd 
ActiveMQ  21482 32629     broker  rtd   unknown                                        /proc/21482/task/32629/root 
ActiveMQ  21482 32629     broker  txt   unknown                                        /proc/21482/task/32629/exe 
ActiveMQ  21482 32629     broker NOFD                                                  /proc/21482/task/32629/fd 
ActiveMQ  21482 32632     broker  cwd   unknown                                        /proc/21482/task/32632/cwd 
ActiveMQ  21482 32632     broker  rtd   unknown                                        /proc/21482/task/32632/root 
ActiveMQ  21482 32632     broker  txt   unknown                                        /proc/21482/task/32632/exe 
ActiveMQ  21482 32632     broker NOFD                                                  /proc/21482/task/32632/fd 
ActiveMQ  21482 32650     broker  cwd   unknown                                        /proc/21482/task/32650/cwd 
ActiveMQ  21482 32650     broker  rtd   unknown                                        /proc/21482/task/32650/root 
ActiveMQ  21482 32650     broker  txt   unknown                                        /proc/21482/task/32650/exe 
ActiveMQ  21482 32650     broker NOFD                                                  /proc/21482/task/32650/fd 
MQTTInact 21482 32683     broker  cwd   unknown                                        /proc/21482/task/32683/cwd 
MQTTInact 21482 32683     broker  rtd   unknown                                        /proc/21482/task/32683/root 
MQTTInact 21482 32683     broker  txt   unknown                                        /proc/21482/task/32683/exe 
MQTTInact 21482 32683     broker NOFD                                                  /proc/21482/task/32683/fd 

Я не конечно, что такое «задача» в этом контексте, но есть несколько открытых файлов для каждого из них, и их тысячи. Не очень проблематично c, но все же вызывает озабоченность то, что это далеко не единственные созданные файлы, а только те, которые в настоящее время остаются открытыми. Есть сотни этих папок с заданиями, и каждая содержит целую сумку файлов:

dr-xr-xr-x    7 broker broker 0 Mär 27 12:19 ./
dr-xr-xr-x 2168 broker broker 0 Mär 27 06:37 ../
dr-xr-xr-x    2 broker broker 0 Mär 27 13:42 attr/
-r--------    1 broker broker 0 Mär 27 13:42 auxv
-r--r--r--    1 broker broker 0 Mär 27 13:42 cgroup
-r--r--r--    1 broker broker 0 Mär 27 13:42 children
--w-------    1 broker broker 0 Mär 27 13:42 clear_refs
-r--r--r--    1 broker broker 0 Mär 27 13:42 cmdline
-rw-r--r--    1 broker broker 0 Mär 27 13:42 comm
-r--r--r--    1 broker broker 0 Mär 27 13:42 cpuset
lrwxrwxrwx    1 broker broker 0 Mär 27 12:19 cwd
-r--------    1 broker broker 0 Mär 27 13:42 environ
lrwxrwxrwx    1 broker broker 0 Mär 27 12:19 exe
dr-x------    2 broker broker 0 Mär 27 12:19 fd/
dr-x------    2 broker broker 0 Mär 27 13:42 fdinfo/
-rw-r--r--    1 broker broker 0 Mär 27 13:42 gid_map
-r--------    1 broker broker 0 Mär 27 13:42 io
-r--r--r--    1 broker broker 0 Mär 27 13:42 limits
-rw-r--r--    1 broker broker 0 Mär 27 13:42 loginuid
-r--r--r--    1 broker broker 0 Mär 27 12:19 maps
-rw-------    1 broker broker 0 Mär 27 13:42 mem
-r--r--r--    1 broker broker 0 Mär 27 13:42 mountinfo
-r--r--r--    1 broker broker 0 Mär 27 13:42 mounts
dr-xr-xr-x    5 broker broker 0 Mär 27 13:42 net/
dr-x--x--x    2 broker broker 0 Mär 27 13:42 ns/
-r--r--r--    1 broker broker 0 Mär 27 13:42 numa_maps
-rw-r--r--    1 broker broker 0 Mär 27 13:42 oom_adj
-r--r--r--    1 broker broker 0 Mär 27 13:42 oom_score
-rw-r--r--    1 broker broker 0 Mär 27 13:42 oom_score_adj
-r--------    1 broker broker 0 Mär 27 13:42 pagemap
-r--------    1 broker broker 0 Mär 27 13:42 personality
-rw-r--r--    1 broker broker 0 Mär 27 13:42 projid_map
lrwxrwxrwx    1 broker broker 0 Mär 27 12:19 root
-rw-r--r--    1 broker broker 0 Mär 27 13:42 sched
-r--r--r--    1 broker broker 0 Mär 27 13:42 schedstat
-r--r--r--    1 broker broker 0 Mär 27 13:42 sessionid
-rw-r--r--    1 broker broker 0 Mär 27 13:42 setgroups
-r--r--r--    1 broker broker 0 Mär 27 13:42 smaps
-r--------    1 broker broker 0 Mär 27 13:42 stack
-r--r--r--    1 broker broker 0 Mär 27 12:19 stat
-r--r--r--    1 broker broker 0 Mär 27 13:42 statm
-r--r--r--    1 broker broker 0 Mär 27 13:42 status
-r--------    1 broker broker 0 Mär 27 13:42 syscall
-rw-r--r--    1 broker broker 0 Mär 27 13:42 uid_map
-r--r--r--    1 broker broker 0 Mär 27 13:42 wchan

Я не знаю, для чего они используются или для чего предназначены. Все, что я знаю, это то, что они создаются непрерывно и остаются до тех пор, пока я не перезагружаю приложение.

Сначала я подумал, что это как-то связано с постоянством, но я больше не уверен в этом. Правда, если лимит файлов достигнут, брокеру MQTT не удается получить новые сообщения со старым добрым сообщением «слишком много файлов открыто» (хотя, похоже, это связано с тем, что он больше не может открыть сокет), но брокер JMS по-прежнему кажется, работает без проблем, и постоянство деактивировано для обоих брокеров в приложении.

Вот как выглядит эта конфигурация:

@Bean
fun mqttBroker(mqttProperties: MqttProperties,
               authenticationService: AuthenticationService,
               ssl: SslContext?): BrokerService {
    return BrokerService().apply {
        isPersistent = false
        isAdvisorySupport = false
        brokerName = "mqtt"
        plugins = listOf(WebcamServiceAuthenticationPlugin(authenticationService, mqttProperties.internalPort)).toTypedArray()
        addConnector("tcp://127.0.0.1:${mqttProperties.internalPort}")

        val host = "${mqttProperties.address}:${mqttProperties.port}"

        if (isTlsActive(mqttProperties)) {
            log.info("Starting MQTT connector using TLS")
            sslContext = ssl!!
            addConnector("mqtt+nio+ssl://$host")
        } else {
            log.info("Starting a plain text MQTT connector")
            addConnector("mqtt+nio://$host")
        }
    }
}
@Component
class JmsBroker(jmsProperties: JmsProperties) {

    private val broker = BrokerService().apply {
        isPersistent = false
        isAdvisorySupport = false
        addConnector("tcp://${jmsProperties.address}:${jmsProperties.port}")
        brokerName = "jms"
    }

    @PostConstruct
    fun start() {
        broker.start()
    }

    @PreDestroy
    fun stop() {
        broker.stop()
    }
}

Очевидно, я бы хотел, чтобы мой брокер чтобы снова стать стабильным, но я боюсь, что это может быть проблемой, которая всегда была там, просто не была замечена, потому что нагрузка была не такой высокой. Не имея реального знания ActiveMQ (он всегда работал до тех пор, пока не узнал), я не знаю, как диагностировать мою проблему. Кто-нибудь знает, что это за файлы, что они делают, и что потенциально может быть неправильным, что приводит к тому, что многие из них создаются и торчат?

Дополнительная информация:

Благодаря комментарию Джастина ниже я обнаружил, что все эти файлы в папке / pro c созданы для запуска потоков. Честно говоря, я ожидал, что в этом случае не хватит памяти задолго до того, как у меня закончатся доступные файлы, поскольку на машине всего 2 ГБ ОЗУ. Кроме того, даже если предположить, что MQTT-брокер, верблюд и ActiveMQ каждый создают по одному потоку для каждого сообщения (чего они не делают, поскольку может быть только треть сообщений go через брокер MQTT), учитывая нашу текущую пиковую нагрузку намного ниже 10 000 сообщений в минуту (это своего рода теоретический максимум, который мы могли бы получить в данный момент, учитывая количество наших клиентов, но мы никогда не получаем их все сразу), это не должно приводить к такому много нитей.

Это заставляет меня предположить, что здесь тонна нитей зомба ie. Что беспокоит. Я ожидал бы, что обработка потоков в ActiveMQ и Camel будет почти идеальной, учитывая их зрелость. Кто-нибудь когда-либо наблюдал подобное поведение от любого из этих?

Еще одна вещь, которую я только что понял, это то, что подавляющее большинство потоков происходит из самой JVM, а не из приложения (судя по их PID). Это может быть нормально, учитывая, что мои брокеры activeMQ работают встраиваемыми, у меня нет опыта в этой области.

Верблюжий путь Меня попросили указать верблюжий маршрут, поэтому вот он:

@Component
class BrokerRouteBuilder(camelContext: CamelContext,
                         mqttProperties: MqttProperties,
                         jmsProperties: JmsProperties,
                         private val authenticationService: AuthenticationService,
                         private val objectMapper: ObjectMapper)
    : RouteBuilder() {

    private val jms = "jmsbroker"
    private val mqtt = "mqttbroker"

    init {
        camelContext.addComponent(jms, activeMQComponent("tcp://127.0.0.1:${jmsProperties.port}"))
        camelContext.addComponent(mqtt, activeMQComponent("tcp://127.0.0.1:${mqttProperties.internalPort}"))
        objectMapper.registerModule(JodaModule())
    }

    override fun configure() {

        // CAPTURE

        from("$jms:queue:capture")
                .log("Sending capture command \${body} to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.capture")

        // FOCUS

        from("$jms:queue:focus")
                .log("Sending focus command \${body} to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.focus")

        // REBOOT

        from("$jms:queue:reboot")
                .log("Sending reboot command \${body} to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.reboot")

        // HEATING

        from("$jms:queue:heating")
                .id("heating-command")
                .log("Sending heating command \${body} for user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.heating")

        from("$mqtt:topic:*.heating-status")
                .id("heating-status")
                .setHeader("user").message(Companion::extractUserName)
                .log("Received heating status \${body} from \${header.user}")
                .to("$jms:queue:heating-status")

        // SETTINGS

        from("$jms:queue:send-settings")
                .id("send-settings")
                .log("Sending settings to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.settings")

        from("$mqtt:topic:*.request-settings")
                .id("request-settings")
                .setHeader("user").message(Companion::extractUserName)
                .log("Received settings request from \${header.user}")
                .to("$jms:queue:request-settings")

        // PANORAMA SETTINGS

        from("$jms:queue:panorama-settings")
                .id("panorama-settings")
                .log("Sending panorama settings to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.panorama-settings")

        from("$mqtt:topic:*.request-panorama-settings")
                .id("request-panorama-settings")
                .setHeader("user").message(Companion::extractUserName)
                .log("Received panorama settings request from \${header.user}")
                .to("$jms:queue:request-panorama-settings")

        // SCHEDULES

        from("$jms:topic:schedule")
                .id("send-schedule")
                .log("Sending schedules to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.schedule")

        from("$mqtt:topic:*.request-schedule")
                .id("request-schedule")
                .setHeader("user").message(Companion::extractUserName)
                .log("Received schedule request from \${header.user}")
                .to("$jms:queue:request-schedule")

        // HEATING SCHEDULES

        from("$jms:queue:heating-schedule")
                .id("send-heating-schedule")
                .log("Sending heating schedule to user '\${header.user}'")
                .toD("$mqtt:topic:\${header.user}.heating-schedule")

        from("$mqtt:topic:*.request-heating-schedule")
                .id("request-heating-schedule")
                .setHeader("user").message(Companion::extractUserName)
                .log("Received heating schedule request from \${header.user}")
                .to("$jms:queue:request-heating-schedule")

        // HEARTBEAT

        from("$mqtt:topic:*.heartbeat")
                .id("camera-heartbeat")
                .setHeader("user").message(Companion::extractUserName)
                .process().message { message ->
                    val heartbeat = objectMapper.readValue(message.getBody(ByteArray::class.java),
                            HeartbeatMessage::class.java)
                    val user = message.getHeader("user", "", String::class.java)
                    val camera = authenticationService.readCamera(user)
                    heartbeat.camera = camera
                    message.body = objectMapper.writeValueAsBytes(heartbeat)
                }
                .log(LoggingLevel.TRACE, "Received heartbeat from \${header.user}: \${body}")
                .to("$jms:topic:heartbeat")

        // EVENTS

        from("$mqtt:topic:*.event")
                .id("yellow-event")
                .setHeader("user").message(Companion::extractUserName)
                .process().message { message ->
                    val event = objectMapper.readValue(message.getBody(ByteArray::class.java),
                            YellowEventDto::class.java)
                    // replace username as event source and event-chain with camera id.
                    val camera = authenticationService.readCamera(event.source)
                    message.body = objectMapper.writeValueAsBytes(event.copy(
                            source = camera, eventChain = event.eventChain.replace(event.source, camera)))
                }
                .log(LoggingLevel.TRACE, "Received event from \${header.user}: \${body}")
                .to("$jms:topic:event")

        // AUTHENTICATION UPDATES

        from("$jms:topic:authentication")
                .process().body(ByteArray::class.java) { body ->
                    val event = objectMapper.readValue<AuthenticationUpdateEvent>(body)
                    authenticationService.onUpdate(event)
                }
        from("direct:request-authentication")
                .to("$jms:queue:request-authentication")

    }

    companion object {
        internal const val destinationHeader = "JMSDestination"
        private val log = logger()

        internal fun extractUserName(message: Message): String {
            val topic = message.getHeader(destinationHeader, "", String::class.java)
            val parts = topic.removePrefix("topic://").split(".")
            return if (parts.size == 2) {
                parts.first()
            } else {
                log.warn("Unexpected topic format '{}'!", topic)
                ""
            }

        }
    }

}

Я создаю экземпляры activeMQComponents вручную, так как их два, и мне нужно указать c. Кажется, что основной интерес представляет пул соединений, для которого документация ActiveMQComponent говорит следующее (обратите внимание, что в нем явно указано, что это верно по умолчанию):

    /**
     * Enables or disables whether a PooledConnectionFactory will be used so that when
     * messages are sent to ActiveMQ from outside of a message consuming thread, pooling will be used rather
     * than the default with the Spring {@link JmsTemplate} which will create a new connection, session, producer
     * for each message then close them all down again.
     * <p/>
     * The default value is true. Note that this requires an extra dependency on commons-pool2.
     */
    public void setUsePooledConnection(boolean usePooledConnection) {
        if (getConfiguration() instanceof ActiveMQConfiguration) {
            ((ActiveMQConfiguration)getConfiguration()).setUsePooledConnection(usePooledConnection);
        }
    }

Ответы [ 2 ]

1 голос
/ 30 марта 2020

Эти записи '/ pro c' связаны с сокетами в состоянии TIME_WAIT?

 netstat -pant | fgrep -i wait

Используете ли вы фабрику соединений пула для отправки и получения сообщений?

import org.messaginghub.pooled.jms.JmsPoolConnectionFactory;
...
...
public ConnectionFactory pooledConnectionFactory() {
    JmsPoolConnectionFactory pooledCF = new JmsPoolConnectionFactory();

    pooledCF.setConnectionFactory(myConnectionFactory());
    pooledCF.setMaxConnections(1);

    return pooledCF;
}    
0 голосов
/ 06 апреля 2020

Даг Гроув привел меня к выводу, что проблема может заключаться не в самом брокере, а в том, как клиенты обрабатывают свои соединения, что в конечном итоге оказалось плодотворной линией расследования. Хотя детали достаточно интересны, чтобы ими поделиться, я думаю.

Большая часть нашей серверной архитектуры все еще работает на Spring-Boot 2.0.4 и подключается к JMS-концу посредников через прослушиватели и шаблоны JMS по умолчанию.

В последнее время, однако, был введен новый микросервис, использующий Spring-Boot 2.2.4, но в значительной степени код JMS-соединения был скопирован из старых сервисов. Проблема в том, что Spring-Boot 2.2.4 по умолчанию использует другую фабрику соединений, CachingConnectionFactory . Вот наиболее важный вывод из документации:

ПРИМЕЧАНИЕ. Для этого ConnectionFactory требуется явное закрытие всех сеансов, полученных из общего подключения.

Поскольку весь код подключения был скопирован без изменений, мы не делали этого. И это оказалось главной причиной, которая привела к безудержному увеличению количества файловых дескрипторов.

Все еще немного странно, что конец MQTT был той частью, которая больше не могла устанавливать новые соединения. в то время как конец JMS, казалось, не имел проблем, но теперь нет сомнений, что это было причиной.

...