Simmer в R: моделирование изменений в мощности сервера на основе длины и продолжительности очереди - PullRequest
0 голосов
/ 26 марта 2019

Я пытаюсь смоделировать систему следующим образом:

Прибытия генерируются в соответствии с предопределенным расписанием и имеют известное время обработки, предоставленное фреймом данных.В начале симуляции имеется один сервер с емкостью, равной min_daemons.Пока все просто, но часть nxt становится хитрой: эта емкость может изменяться на интервале [min_daemons, max_daemons] в течение моделирования согласно следующему алгоритму:

Если в любое время во время моделирования длина очереди достигаетили превышает значение incr_count и остается на уровне или выше этого уровня для incr_delay, затем к главному серверу добавляется дополнительная единица мощности.Это может произойти в любое время, при условии, что емкость никогда не превышает max_daemons.

Обратное также верно.Если в любое время длина очереди меньше decr_count и остается на уровне или ниже этого уровня для decr_delay, единица емкости удаляется, потенциально до уровня min_daemons.

Я создал траекторию для всехпоступления, которые разветвляются, когда выполняются условия для изменения емкости сервера выше.Проблема в том, что изменения в вместимости всегда связаны с событием прибытия.Что мне действительно нужно, так это процесс, независимый от траектории прибытия, который постоянно отслеживает длину очереди и вносит соответствующие изменения в емкость.

Я рассмотрел альтернативно выполнение этого с неким фиктивным процессом прибытия, скажем, каждую секундумоделирование, но я не был уверен, смогу ли я предотвратить фиктивные прибытия, конкурирующие с истинными поступлениями за емкость сервера.

#instantiate simulation environment
env <- simmer("queues") %>% add_resource("daemon",1) %>% add_global("incr_start",99999) %>% add_global("decr_start",99999)

run <-  trajectory() %>% 
  branch(option = function() if (get_queue_count(env,"daemon") >=  incr_count) {1}
                             else if (get_queue_count(env,"daemon") <= decr_count) {2}
                             else {3}
         ,
         continue = c(T, T, T)
         ,
         trajectory("increment?")
         %>% branch(option = function() if (now(env) - get_global(env,"incr_start") >= incr_delay
                                            & get_capacity(env,"daemon") < max_daemons) {1}
                                        else if (get_global(env,"incr_start")==99999) {2}
                                        else {3}
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("increment")
                    %>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
                    %>% log_(function () 
                      {paste("Queue has exceeded count for ",now(env)-get_global(env,"incr_start")," seconds.")})
                    %>% set_capacity(resource = "daemon", value = 1, mod="+")
                    ,
                    trajectory("update incr start")
                    %>% set_global("incr_start",now(env))
                    %>% log_("Queue count is now above increment count. Starting increment timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet increment criteria. Doing nothing.")
         )
         ,
         trajectory("decrement?")
         %>% branch(option = function() if (now(env) - get_global(env,"decr_start") >= decr_delay
                                            & get_capacity(env,"daemon") > min_daemons) {1}
                                        else if (get_global(env,"decr_start")==99999) {2}
                                        else {3}
                    ,
                    continue = c(T, T, T)
                    ,
                    trajectory("decrement")
                    %>% log_(function () {paste("Queue size is: ",get_queue_count(env,"daemon"))})
                    %>% log_(function () 
                      {paste("Queue has been less than count for ",now(env)-get_global(env,"incr_start")," seconds.")})
                    %>% set_capacity(resource = "daemon", value = -1, mod="+")
                    ,
                    trajectory("update decr start")
                    %>% set_global("decr_start",now(env))
                    %>% log_("Queue count is now below decrement count. Starting decrement timer.")
                    ,
                    trajectory("do nothing")
                    %>% log_("Did not meet decrement criteria. Doing nothing.")
         )
         ,
         trajectory("reset_timer")
         %>% log_("Did not meet criteria to increment or decrement. Resetting timers.")
         %>% set_global("decr_start", values = 99999)
         %>% set_global("decr_start", values = 99999)
  ) %>%
  seize("daemon") %>% 
  log_("Now running") %>%
  log_(function () {paste(get_queue_count(env,"daemon")," runs in the queue.")}) %>%
  timeout_from_attribute("service") %>% 
  release("daemon") %>% 
  log_("Run complete")

env %>% 
  add_dataframe("run", run, arr,time="absolute") %>% 
  run(200)

Мне нужно сделать еще несколько отладок, чтобы убедиться, что моделирование работает какЯ задумал, но я полностью понимаю, что эта модель не так.Я могу надеяться, что дизайн не слишком компрометирует его достоверность, но я также хочу получить обратную связь о том, как я могу создать что-то, что будет более правдоподобным для реальной жизни.

1 Ответ

0 голосов
/ 27 марта 2019

Проверка статуса через равные промежутки времени сводит на нет всю цель иметь дискретные события. У нас здесь асинхронный процесс, поэтому правильный способ его моделирования - использование сигналов.

Нам нужно спросить себя: когда могла очередь ...

  1. увеличить? Когда прибытие достигает активности seize(). Поэтому, прежде чем пытаться захватить ресурс, нам нужно проверить количество прибывающих в очередь в очереди и действовать соответственно:

    • Если оно равно decr_count, необходимо отправить сигнал для отмены любой попытки уменьшить емкость сервера.
    • Если оно равно incr_count - 1, должен быть отправлен сигнал для запроса увеличения емкости сервера.
    • Иначе ничего не делать.
  2. уменьшить? Когда прибытие обслуживается (т. Е. Продолжается до следующего действия после seize()). Поэтому после захвата ресурса нам также необходимо проверить число поставленных в очередь прибытий:

    • Если оно равно incr_count - 1, необходимо отправить сигнал об отмене любой попытки увеличить емкость сервера.
    • Если оно равно decr_count, необходимо отправить сигнал с просьбой уменьшить емкость сервера.
    • Иначе ничего не делать.

Процедура проверки количества прибывающих в очередь в очереди и принятия решения о том, какой тип сигнала необходим, если таковой имеется, может быть объединена в многократно используемую функцию (назовем ее check_queue) следующим образом:

library(simmer)

env <- simmer()

check_queue <- function(.trj, resource, mod, lim_queue, lim_server) {
  .trj %>% branch(
    function() {
      if (get_queue_count(env, resource) == lim_queue[1])
        return(1)
      if (get_queue_count(env, resource) == lim_queue[2] &&
          get_capacity(env, resource)    != lim_server)
        return(2)
      0 # pass
    },
    continue = c(TRUE, TRUE),
    trajectory() %>% send(paste("cancel", mod[1])),
    trajectory() %>% send(mod[2])
  )
}

main <- trajectory() %>%
  check_queue("resource", c("-", "+"), c(decr_count, incr_count-1), max_daemons) %>%
  seize("resource") %>%
  check_queue("resource", c("+", "-"), c(incr_count-1, decr_count), min_daemons) %>%
  timeout_from_attribute("service") %>%
  release("resource")

Таким образом, основная траектория довольно проста. Затем нам потребуется пара процессов для получения таких сигналов и увеличения / уменьшения емкости после некоторой задержки:

change_capacity <- function(resource, mod, delay, limit) {
  trajectory() %>%
    untrap(paste("cancel", mod)) %>%
    trap(mod) %>%
    wait() %>%
    # signal received
    untrap(mod) %>%
    trap(paste("cancel", mod),
         handler = trajectory() %>%
           # cancelled! start from the beginning
           rollback(Inf)) %>%
    timeout(delay) %>%
    set_capacity(resource, as.numeric(paste0(mod, 1)), mod="+") %>%
    # do we need to keep changing the capacity?
    rollback(2, check=function() get_capacity(env, resource) != limit) %>%
    # start from the beginning
    rollback(Inf)
}

incr_capacity <- change_capacity("resource", "+", incr_delay, max_daemons)
decr_capacity <- change_capacity("resource", "-", decr_delay, min_daemons)

Наконец, мы добавляем ресурс, наши процессы и данные в среду моделирования:

env %>%
  add_resource("resource", min_daemons) %>%
  add_generator("incr", incr_capacity, at(0)) %>%
  add_generator("decr", decr_capacity, at(0)) %>%
  add_dataframe("arrival", main, data)

Пожалуйста, обратите внимание, что я не проверял этот код . Это может потребовать некоторых корректировок, но общая идея есть.

...