Многократные процессы выгружают задание, в то время как другие ждут, а затем уведомляют всех остальных после завершения, как многопоточность - PullRequest
0 голосов
/ 29 апреля 2020

Я недавно работал в системе с несколькими процессами.

У меня есть задача привести в порядок / очистить журналы для системного сервера XX, запущенного с несколькими процессами, когда он запущен.

  • Порядок запуска для этих процессов не может быть изменен, но я могу заставить сервер запускать мой код при его запуске.
  • Эти имена журналов выглядят как "app12504_2020-04-29_00-21-46.0.log", где, 12504 - это идентификатор процесса (PID). Я использовал регулярное выражение app_\\d+_\\d{4}-\\d{2}-\\d{2}_\\d{2}-\\d{2}-\\d{2}\\.\\d+\\.log для сопоставления этих файлов журнала.

Поэтому, когда сервер перезагружается, мне нужно только выбрать процесс, чтобы привести в порядок журналы, например переместить старые журналы в другую директорию истории и запустить поток, чтобы привести в порядок журналы в каталоге истории из-за стратегии ротации журналов (файлы автоматически перемещаются туда) во время последующих запусков, но другие процессы должны ждать завершения задания в вытесняющем процессе.
Если другие процессы не работают Не ждите, они могут сразу генерировать логи и влиять на работу по отливу. Это нормально, что эти файлы журнала не могут быть удалены в Windows из-за «Процесс не может получить доступ к файлу, потому что он используется другим процессом». На Linux, однако, эти файлы журнала могут быть удалены и видны только для tail -f /proc/self/fd/fd_num, это очень плохо.

Я также рассматриваю возможность сделать окончательный поворот журнала, когда эти процессы обычно завершаются. К сожалению, команда StopSystem не может работать хорошо. Всегда есть подвох (остаточный), не говоря уже о том, что сервер может взломать sh или быть убитым kill -9 $(pgrep program) ( KILL (2) ), эти журналы не могут быть перемещены в каталог истории. Поэтому мне нужен процесс, чтобы привести в порядок текущий каталог при запуске и каталог истории во время выполнения.

Я попытался просто имитировать nginx (ngx_create_pidfile в "nginx \ src \" core \ ngx_cycle. c ") с использованием Boost.Interprocess для совместимости Windows и Linux. Кажется, что это хорошо работает, когда процесс падает или убит. Мы можем просматривать некоторые файлы с такими же именами, которые используются в named_semaphore, managed_shared_memory, named_mutex и named_condition в C:\ProgramData\boost_interprocess\1587676164, но file_lock может указывать путь к файлу.

Насколько мне известно, я предполагаю, что эти именованные объекты могут работать с таким количеством ссылок, как:

Даже если процесс обычно не завершается, его счетчик ссылок по-прежнему уменьшается. Когда счетчик ссылок достигает нуля, система может автоматически удалить их. Но, похоже, никто не может сделать это для этих объектов. Итак, как добиться наилучших ожиданий в качестве заголовка .

. Возьмем, к примеру, следующий код, лучше, если переменная count управляется этими объектами имен, то есть ядром ОС.

#include <regex> //since C++11
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/named_mutex.hpp>
#include <boost/interprocess/sync/named_condition.hpp>
//https://www.boost.org/doc/libs/1_55_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.file_lock.file_lock_whats_a_file_lock
#include <boost/interprocess/sync/file_lock.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
//https://www.boost.org/doc/libs/1_72_0/doc/html/interprocess/synchronization_mechanisms.html#interprocess.synchronization_mechanisms.semaphores
#include <boost/interprocess/sync/named_semaphore.hpp>

using namespace boost::interprocess;

#define IP_MEMORY_NAME "log_shm"
#define IP_MUTEX_NAME "log_mtx"
#define IP_CONDITION_NAME "log_cnd"

//create => ReferenceCount=1, open => ++ReferenceCount
managed_shared_memory g_managed_shm{ open_or_create, IP_MEMORY_NAME, 1024 };

int *pid = g_managed_shm.find_or_construct<int>("PID")(0);
int *count = g_managed_shm.find_or_construct<int>("Count")(0);
named_mutex g_named_mtx{ open_or_create, IP_MUTEX_NAME };
named_condition g_named_cnd{ open_or_create, IP_CONDITION_NAME };


//NOTE It should be quite obvious that if `kill -9` is used on your application (forced termination) then all
//bets are off and you'll have to either remove the Name Semaphore object or explicitly unlock it (`post()`).
void signal_handler(int signal = 0)
{
    // A signal occurred.
    std::cerr << "Exiting with signal " << signal << "...\n";

    //clear_interprocess_resoures
    {
        scoped_lock<named_mutex> lock{ g_named_mtx };
        std::cout << "clear_interprocess_resoures: pid=" << *pid << ", count=" << *count << std::endl;

        if (0 == --*count)
        {
            *pid = 0;
            shared_memory_object::remove(IP_MEMORY_NAME);
            named_mutex::remove(IP_MUTEX_NAME);
            named_condition::remove(IP_CONDITION_NAME);
        }
    }

    //we ignore the result, the user may have removed the file already.
    //(void)unlink(lock_filename);
}


void tidy_up_log(boost::filesystem::path log_file_pattern, log_collector_ptr collector)
{
    std::cout << "feign being busy...\n";
}

void start_history_log_tiding_timer(log_collector_ptr collector)
{
}

void main() 
{
    {
        //std::cout << "before scoped_lock, wait for 6 seconds, pid=" << *pid << ", count=" << *count << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(6));
        scoped_lock<named_mutex> lock{ g_named_mtx };
        //std::cout << "after scoped_lock, wait for 6 seconds, pid=" << *pid << ", count=" << *count << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(6));

        ++*count;

        if (0 != *pid)
        {
            std::cout << "preempted pid=" << pid << std::endl;
            g_named_cnd.wait(lock);
        }
        else
        {
            *pid = getpid();
            std::cout << "set pid=" << *pid << std::endl;

            tidy_up_log(log_file_pattern, log_file_collector);
            start_history_log_tiding_timer();

            g_named_cnd.notify_all();
        }
        //std::cout << "wait for 6 seconds to release lock, pid=" << *pid << std::endl;
        //std::this_thread::sleep_for(std::chrono::seconds(6));

        //std::cout << "~scoped_lock" << pid << std::endl;
    }
}   

Некоторые ссылки:

...