У нас есть два брокера activeMQ, работающие в подпружиненной загрузке, один для MQTT, один для JMS. Существует верблюжий маршрут, который может передавать сообщения между ними, поскольку MQTT является конечной точкой publi c, в то время как JMS-брокер доступен только внутри сети, но, насколько я могу судить, в настоящее время верблюд, похоже, не участвует в создании проблема. Хотя с верблюдом я не совсем уверен ...
Проблема в том, что приложение успешно создает тонны файлов в / pro c, пока не будет достигнут системный лимит открытых файлов для процесса, и все рушится.
Я уже установил максимальное количество открытых файлов Ubuntu на максимум, но проблема усугубляется с ростом нагрузки.
Я могу запустить это команда (брокер - пользователь, управляющий приложением):
/usr/bin/lsof | grep broker
И то, что я получаю, выглядит примерно так (очевидно, сильно обрезано):
ActiveMQ 21482 32629 broker cwd unknown /proc/21482/task/32629/cwd
ActiveMQ 21482 32629 broker rtd unknown /proc/21482/task/32629/root
ActiveMQ 21482 32629 broker txt unknown /proc/21482/task/32629/exe
ActiveMQ 21482 32629 broker NOFD /proc/21482/task/32629/fd
ActiveMQ 21482 32632 broker cwd unknown /proc/21482/task/32632/cwd
ActiveMQ 21482 32632 broker rtd unknown /proc/21482/task/32632/root
ActiveMQ 21482 32632 broker txt unknown /proc/21482/task/32632/exe
ActiveMQ 21482 32632 broker NOFD /proc/21482/task/32632/fd
ActiveMQ 21482 32650 broker cwd unknown /proc/21482/task/32650/cwd
ActiveMQ 21482 32650 broker rtd unknown /proc/21482/task/32650/root
ActiveMQ 21482 32650 broker txt unknown /proc/21482/task/32650/exe
ActiveMQ 21482 32650 broker NOFD /proc/21482/task/32650/fd
MQTTInact 21482 32683 broker cwd unknown /proc/21482/task/32683/cwd
MQTTInact 21482 32683 broker rtd unknown /proc/21482/task/32683/root
MQTTInact 21482 32683 broker txt unknown /proc/21482/task/32683/exe
MQTTInact 21482 32683 broker NOFD /proc/21482/task/32683/fd
Я не конечно, что такое «задача» в этом контексте, но есть несколько открытых файлов для каждого из них, и их тысячи. Не очень проблематично c, но все же вызывает озабоченность то, что это далеко не единственные созданные файлы, а только те, которые в настоящее время остаются открытыми. Есть сотни этих папок с заданиями, и каждая содержит целую сумку файлов:
dr-xr-xr-x 7 broker broker 0 Mär 27 12:19 ./
dr-xr-xr-x 2168 broker broker 0 Mär 27 06:37 ../
dr-xr-xr-x 2 broker broker 0 Mär 27 13:42 attr/
-r-------- 1 broker broker 0 Mär 27 13:42 auxv
-r--r--r-- 1 broker broker 0 Mär 27 13:42 cgroup
-r--r--r-- 1 broker broker 0 Mär 27 13:42 children
--w------- 1 broker broker 0 Mär 27 13:42 clear_refs
-r--r--r-- 1 broker broker 0 Mär 27 13:42 cmdline
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 comm
-r--r--r-- 1 broker broker 0 Mär 27 13:42 cpuset
lrwxrwxrwx 1 broker broker 0 Mär 27 12:19 cwd
-r-------- 1 broker broker 0 Mär 27 13:42 environ
lrwxrwxrwx 1 broker broker 0 Mär 27 12:19 exe
dr-x------ 2 broker broker 0 Mär 27 12:19 fd/
dr-x------ 2 broker broker 0 Mär 27 13:42 fdinfo/
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 gid_map
-r-------- 1 broker broker 0 Mär 27 13:42 io
-r--r--r-- 1 broker broker 0 Mär 27 13:42 limits
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 loginuid
-r--r--r-- 1 broker broker 0 Mär 27 12:19 maps
-rw------- 1 broker broker 0 Mär 27 13:42 mem
-r--r--r-- 1 broker broker 0 Mär 27 13:42 mountinfo
-r--r--r-- 1 broker broker 0 Mär 27 13:42 mounts
dr-xr-xr-x 5 broker broker 0 Mär 27 13:42 net/
dr-x--x--x 2 broker broker 0 Mär 27 13:42 ns/
-r--r--r-- 1 broker broker 0 Mär 27 13:42 numa_maps
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 oom_adj
-r--r--r-- 1 broker broker 0 Mär 27 13:42 oom_score
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 oom_score_adj
-r-------- 1 broker broker 0 Mär 27 13:42 pagemap
-r-------- 1 broker broker 0 Mär 27 13:42 personality
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 projid_map
lrwxrwxrwx 1 broker broker 0 Mär 27 12:19 root
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 sched
-r--r--r-- 1 broker broker 0 Mär 27 13:42 schedstat
-r--r--r-- 1 broker broker 0 Mär 27 13:42 sessionid
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 setgroups
-r--r--r-- 1 broker broker 0 Mär 27 13:42 smaps
-r-------- 1 broker broker 0 Mär 27 13:42 stack
-r--r--r-- 1 broker broker 0 Mär 27 12:19 stat
-r--r--r-- 1 broker broker 0 Mär 27 13:42 statm
-r--r--r-- 1 broker broker 0 Mär 27 13:42 status
-r-------- 1 broker broker 0 Mär 27 13:42 syscall
-rw-r--r-- 1 broker broker 0 Mär 27 13:42 uid_map
-r--r--r-- 1 broker broker 0 Mär 27 13:42 wchan
Я не знаю, для чего они используются или для чего предназначены. Все, что я знаю, это то, что они создаются непрерывно и остаются до тех пор, пока я не перезагружаю приложение.
Сначала я подумал, что это как-то связано с постоянством, но я больше не уверен в этом. Правда, если лимит файлов достигнут, брокеру MQTT не удается получить новые сообщения со старым добрым сообщением «слишком много файлов открыто» (хотя, похоже, это связано с тем, что он больше не может открыть сокет), но брокер JMS по-прежнему кажется, работает без проблем, и постоянство деактивировано для обоих брокеров в приложении.
Вот как выглядит эта конфигурация:
@Bean
fun mqttBroker(mqttProperties: MqttProperties,
authenticationService: AuthenticationService,
ssl: SslContext?): BrokerService {
return BrokerService().apply {
isPersistent = false
isAdvisorySupport = false
brokerName = "mqtt"
plugins = listOf(WebcamServiceAuthenticationPlugin(authenticationService, mqttProperties.internalPort)).toTypedArray()
addConnector("tcp://127.0.0.1:${mqttProperties.internalPort}")
val host = "${mqttProperties.address}:${mqttProperties.port}"
if (isTlsActive(mqttProperties)) {
log.info("Starting MQTT connector using TLS")
sslContext = ssl!!
addConnector("mqtt+nio+ssl://$host")
} else {
log.info("Starting a plain text MQTT connector")
addConnector("mqtt+nio://$host")
}
}
}
@Component
class JmsBroker(jmsProperties: JmsProperties) {
private val broker = BrokerService().apply {
isPersistent = false
isAdvisorySupport = false
addConnector("tcp://${jmsProperties.address}:${jmsProperties.port}")
brokerName = "jms"
}
@PostConstruct
fun start() {
broker.start()
}
@PreDestroy
fun stop() {
broker.stop()
}
}
Очевидно, я бы хотел, чтобы мой брокер чтобы снова стать стабильным, но я боюсь, что это может быть проблемой, которая всегда была там, просто не была замечена, потому что нагрузка была не такой высокой. Не имея реального знания ActiveMQ (он всегда работал до тех пор, пока не узнал), я не знаю, как диагностировать мою проблему. Кто-нибудь знает, что это за файлы, что они делают, и что потенциально может быть неправильным, что приводит к тому, что многие из них создаются и торчат?
Дополнительная информация:
Благодаря комментарию Джастина ниже я обнаружил, что все эти файлы в папке / pro c созданы для запуска потоков. Честно говоря, я ожидал, что в этом случае не хватит памяти задолго до того, как у меня закончатся доступные файлы, поскольку на машине всего 2 ГБ ОЗУ. Кроме того, даже если предположить, что MQTT-брокер, верблюд и ActiveMQ каждый создают по одному потоку для каждого сообщения (чего они не делают, поскольку может быть только треть сообщений go через брокер MQTT), учитывая нашу текущую пиковую нагрузку намного ниже 10 000 сообщений в минуту (это своего рода теоретический максимум, который мы могли бы получить в данный момент, учитывая количество наших клиентов, но мы никогда не получаем их все сразу), это не должно приводить к такому много нитей.
Это заставляет меня предположить, что здесь тонна нитей зомба ie. Что беспокоит. Я ожидал бы, что обработка потоков в ActiveMQ и Camel будет почти идеальной, учитывая их зрелость. Кто-нибудь когда-либо наблюдал подобное поведение от любого из этих?
Еще одна вещь, которую я только что понял, это то, что подавляющее большинство потоков происходит из самой JVM, а не из приложения (судя по их PID). Это может быть нормально, учитывая, что мои брокеры activeMQ работают встраиваемыми, у меня нет опыта в этой области.
Верблюжий путь Меня попросили указать верблюжий маршрут, поэтому вот он:
@Component
class BrokerRouteBuilder(camelContext: CamelContext,
mqttProperties: MqttProperties,
jmsProperties: JmsProperties,
private val authenticationService: AuthenticationService,
private val objectMapper: ObjectMapper)
: RouteBuilder() {
private val jms = "jmsbroker"
private val mqtt = "mqttbroker"
init {
camelContext.addComponent(jms, activeMQComponent("tcp://127.0.0.1:${jmsProperties.port}"))
camelContext.addComponent(mqtt, activeMQComponent("tcp://127.0.0.1:${mqttProperties.internalPort}"))
objectMapper.registerModule(JodaModule())
}
override fun configure() {
// CAPTURE
from("$jms:queue:capture")
.log("Sending capture command \${body} to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.capture")
// FOCUS
from("$jms:queue:focus")
.log("Sending focus command \${body} to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.focus")
// REBOOT
from("$jms:queue:reboot")
.log("Sending reboot command \${body} to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.reboot")
// HEATING
from("$jms:queue:heating")
.id("heating-command")
.log("Sending heating command \${body} for user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.heating")
from("$mqtt:topic:*.heating-status")
.id("heating-status")
.setHeader("user").message(Companion::extractUserName)
.log("Received heating status \${body} from \${header.user}")
.to("$jms:queue:heating-status")
// SETTINGS
from("$jms:queue:send-settings")
.id("send-settings")
.log("Sending settings to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.settings")
from("$mqtt:topic:*.request-settings")
.id("request-settings")
.setHeader("user").message(Companion::extractUserName)
.log("Received settings request from \${header.user}")
.to("$jms:queue:request-settings")
// PANORAMA SETTINGS
from("$jms:queue:panorama-settings")
.id("panorama-settings")
.log("Sending panorama settings to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.panorama-settings")
from("$mqtt:topic:*.request-panorama-settings")
.id("request-panorama-settings")
.setHeader("user").message(Companion::extractUserName)
.log("Received panorama settings request from \${header.user}")
.to("$jms:queue:request-panorama-settings")
// SCHEDULES
from("$jms:topic:schedule")
.id("send-schedule")
.log("Sending schedules to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.schedule")
from("$mqtt:topic:*.request-schedule")
.id("request-schedule")
.setHeader("user").message(Companion::extractUserName)
.log("Received schedule request from \${header.user}")
.to("$jms:queue:request-schedule")
// HEATING SCHEDULES
from("$jms:queue:heating-schedule")
.id("send-heating-schedule")
.log("Sending heating schedule to user '\${header.user}'")
.toD("$mqtt:topic:\${header.user}.heating-schedule")
from("$mqtt:topic:*.request-heating-schedule")
.id("request-heating-schedule")
.setHeader("user").message(Companion::extractUserName)
.log("Received heating schedule request from \${header.user}")
.to("$jms:queue:request-heating-schedule")
// HEARTBEAT
from("$mqtt:topic:*.heartbeat")
.id("camera-heartbeat")
.setHeader("user").message(Companion::extractUserName)
.process().message { message ->
val heartbeat = objectMapper.readValue(message.getBody(ByteArray::class.java),
HeartbeatMessage::class.java)
val user = message.getHeader("user", "", String::class.java)
val camera = authenticationService.readCamera(user)
heartbeat.camera = camera
message.body = objectMapper.writeValueAsBytes(heartbeat)
}
.log(LoggingLevel.TRACE, "Received heartbeat from \${header.user}: \${body}")
.to("$jms:topic:heartbeat")
// EVENTS
from("$mqtt:topic:*.event")
.id("yellow-event")
.setHeader("user").message(Companion::extractUserName)
.process().message { message ->
val event = objectMapper.readValue(message.getBody(ByteArray::class.java),
YellowEventDto::class.java)
// replace username as event source and event-chain with camera id.
val camera = authenticationService.readCamera(event.source)
message.body = objectMapper.writeValueAsBytes(event.copy(
source = camera, eventChain = event.eventChain.replace(event.source, camera)))
}
.log(LoggingLevel.TRACE, "Received event from \${header.user}: \${body}")
.to("$jms:topic:event")
// AUTHENTICATION UPDATES
from("$jms:topic:authentication")
.process().body(ByteArray::class.java) { body ->
val event = objectMapper.readValue<AuthenticationUpdateEvent>(body)
authenticationService.onUpdate(event)
}
from("direct:request-authentication")
.to("$jms:queue:request-authentication")
}
companion object {
internal const val destinationHeader = "JMSDestination"
private val log = logger()
internal fun extractUserName(message: Message): String {
val topic = message.getHeader(destinationHeader, "", String::class.java)
val parts = topic.removePrefix("topic://").split(".")
return if (parts.size == 2) {
parts.first()
} else {
log.warn("Unexpected topic format '{}'!", topic)
""
}
}
}
}
Я создаю экземпляры activeMQComponents вручную, так как их два, и мне нужно указать c. Кажется, что основной интерес представляет пул соединений, для которого документация ActiveMQComponent говорит следующее (обратите внимание, что в нем явно указано, что это верно по умолчанию):
/**
* Enables or disables whether a PooledConnectionFactory will be used so that when
* messages are sent to ActiveMQ from outside of a message consuming thread, pooling will be used rather
* than the default with the Spring {@link JmsTemplate} which will create a new connection, session, producer
* for each message then close them all down again.
* <p/>
* The default value is true. Note that this requires an extra dependency on commons-pool2.
*/
public void setUsePooledConnection(boolean usePooledConnection) {
if (getConfiguration() instanceof ActiveMQConfiguration) {
((ActiveMQConfiguration)getConfiguration()).setUsePooledConnection(usePooledConnection);
}
}