Принудительное выполнение определенного кода в одном потоке - PullRequest
1 голос
/ 30 апреля 2020

У нас есть старая сторонняя система (назовем ее Junksoft® 95), с которой мы взаимодействуем через PowerShell (она предоставляет COM-объект), и я нахожусь в процессе ее оборачивания в REST API (ASP). NET Framework 4.8 и WebAPI 2). Я использую пакет System.Management.Automation nuget для создания PowerShell, в котором я создаю экземпляр COM API Junksoft в качестве dynamic объекта, который затем использую:

//I'm omitting some exception handling and maintenance code for brevity
powerShell = System.Management.Automation.PowerShell.Create();
powerShell.AddScript("Add-Type -Path C:\Path\To\Junksoft\Scripting.dll");
powerShell.AddScript("New-Object Com.Junksoft.Scripting.ScriptingObject");
dynamic junksoftAPI = powerShell.Invoke()[0];

//Now we issue commands to junksoftAPI like this:
junksoftAPI.Login(user,pass);
int age = junksoftAPI.GetAgeByCustomerId(custId);
List<string> names = junksoftAPI.GetNames();

. Это прекрасно работает, когда я запускаю все это в том же потоке (например, в консольном приложении). Однако по какой-то причине это обычно не работает, когда я помещаю junksoftAPI в System.Web.Caching.Cache и использую его с разных контроллеров в моем веб-приложении. Я говорю обычно , потому что это действительно работает, когда ASP. NET происходит, чтобы дать входящий вызов потоку, в котором был создан junksoftAPI. Если этого не произойдет, Junksoft 95 выдаст мне ошибку.

Можно ли как-нибудь убедиться, что все взаимодействия с junksoftAPI происходят в том же потоке ?

Обратите внимание , что я не хочу превращать целое веб-приложение в однопоточное приложение! Логика c в контроллерах и т. Д. Должна происходить как обычно на разных потоках. Это должно быть только взаимодействие Junksoft, которое происходит в потоке Junksoft-speci c, что-то вроде этого:

[HttpGet]
public IHttpActionResult GetAge(...)
{
    //finding customer ID in database...

    ...

    int custAge = await Task.Run(() => {
        //this should happen on the Junksoft-specific thread and not the next available thread
        var cache = new System.Web.Caching.Cache();
        var junksoftAPI = cache.Get(...); //This has previously been added to cache on the Junksoft-specific thread
        return junksoftAPI.GetAgeByCustomerId(custId);
    });

    //prepare a response using custAge...
}

Ответы [ 3 ]

2 голосов
/ 01 мая 2020

Вы можете создать свой собственный рабочий поток синглтона для достижения этой цели. Вот код, который вы можете подключить к веб-приложению.

public class JunkSoftRunner
{
    private static JunkSoftRunner _instance;

    //singleton pattern to restrict all the actions to be executed on a single thread only.
    public static JunkSoftRunner Instance => _instance ?? (_instance = new JunkSoftRunner());

    private readonly SemaphoreSlim _semaphore;
    private readonly AutoResetEvent _newTaskRunSignal;

    private TaskCompletionSource<object> _taskCompletionSource;
    private Func<object> _func;

    private JunkSoftRunner()
    {
        _semaphore = new SemaphoreSlim(1, 1);
        _newTaskRunSignal = new AutoResetEvent(false);
        var contextThread = new Thread(ThreadLooper)
        {
            Priority = ThreadPriority.Highest
        };
        contextThread.Start();
    }

    private void ThreadLooper()
    {
        while (true)
        {
            //wait till the next task signal is received.
            _newTaskRunSignal.WaitOne();

            //next task execution signal is received.
            try
            {
                //try execute the task and get the result
                var result = _func.Invoke();

                //task executed successfully, set the result
                _taskCompletionSource.SetResult(result);
            }
            catch (Exception ex)
            {
                //task execution threw an exception, set the exception and continue with the looper
                _taskCompletionSource.SetException(ex);
            }

        }
    }

    public async Task<TResult> Run<TResult>(Func<TResult> func, CancellationToken cancellationToken = default(CancellationToken))
    {
        //allows only one thread to run at a time.
        await _semaphore.WaitAsync(cancellationToken);

        //thread has acquired the semaphore and entered
        try
        {
            //create new task completion source to wait for func to get executed on the context thread
            _taskCompletionSource = new TaskCompletionSource<object>();

            //set the function to be executed by the context thread
            _func = () => func();

            //signal the waiting context thread that it is time to execute the task
            _newTaskRunSignal.Set();

            //wait and return the result till the task execution is finished on the context/looper thread.
            return (TResult)await _taskCompletionSource.Task;
        }
        finally
        {
            //release the semaphore to allow other threads to acquire it.
            _semaphore.Release();
        }
    }
}

Основной метод тестирования консоли:

public class Program
{
    //testing the junk soft runner
    public static void Main()
    {
        //get the singleton instance
        var softRunner = JunkSoftRunner.Instance;

        //simulate web request on different threads
        for (var i = 0; i < 10; i++)
        {
            var taskIndex = i;
            //launch a web request on a new thread.
            Task.Run(async () =>
            {
                Console.WriteLine($"Task{taskIndex} (ThreadID:'{Thread.CurrentThread.ManagedThreadId})' Launched");
                return await softRunner.Run(() =>
                {
                    Console.WriteLine($"->Task{taskIndex} Completed On '{Thread.CurrentThread.ManagedThreadId}' thread.");
                    return taskIndex;
                });
            });
        }
    }   
}

Вывод:

enter image description here

Обратите внимание, что, хотя функция была запущена из разных потоков, некоторая часть кода всегда выполнялась всегда в одном и том же контексте с идентификатором: «5».

Но учтите, что, хотя все веб-запросы выполняются в независимых потоках, они в конечном итоге будут ожидать выполнения некоторых задач в одноэлементном рабочем потоке. В конечном итоге это создаст шею bottle в вашем веб-приложении. Это в любом случае ваше ограничение дизайна.

2 голосов
/ 01 мая 2020

Вот как можно выдавать команды для API Junksoft из выделенного потока STA, используя класс BlockingCollection:

public class JunksoftSTA : IDisposable
{
    private readonly BlockingCollection<Action<Lazy<dynamic>>> _pump;
    private readonly Thread _thread;

    public JunksoftSTA()
    {
        _pump = new BlockingCollection<Action<Lazy<dynamic>>>();
        _thread = new Thread(() =>
        {
            var lazyApi = new Lazy<dynamic>(() =>
            {
                var powerShell = System.Management.Automation.PowerShell.Create();
                powerShell.AddScript("Add-Type -Path C:\Path\To\Junksoft.dll");
                powerShell.AddScript("New-Object Com.Junksoft.ScriptingObject");
                dynamic junksoftAPI = powerShell.Invoke()[0];
                return junksoftAPI;
            });
            foreach (var action in _pump.GetConsumingEnumerable())
            {
                action(lazyApi);
            }
        });
        _thread.SetApartmentState(ApartmentState.STA);
        _thread.IsBackground = true;
        _thread.Start();
    }

    public Task<T> CallAsync<T>(Func<dynamic, T> function)
    {
        var tcs = new TaskCompletionSource<T>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        _pump.Add(lazyApi =>
        {
            try
            {
                var result = function(lazyApi.Value);
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        return tcs.Task;
    }

    public Task CallAsync(Action<dynamic> action)
    {
        return CallAsync<object>(api => { action(api); return null; });
    }

    public void Dispose() => _pump.CompleteAdding();

    public void Join() => _thread.Join();
}

Цель использования Класс Lazy предназначен для всплытия возможного исключения во время создания динамического c объекта путем передачи его вызывающим объектам.

... исключения кэшируются. То есть, если метод фабрики выдает исключение при первой попытке потока получить доступ к свойству Value объекта Lazy<T>, то же исключение выдается при каждой последующей попытке.

Использование пример:

// A static field stored somewhere
public static readonly JunksoftSTA JunksoftStatic = new JunksoftSTA();

await JunksoftStatic.CallAsync(api => { api.Login("x", "y"); });
int age = await JunksoftStatic.CallAsync(api => api.GetAgeByCustomerId(custId));

Если вы обнаружите, что одного потока STA недостаточно для своевременного обслуживания всех запросов, вы можете добавить больше потоков STA, каждый из которых выполняет один и тот же код (private readonly Thread[] _threads; et c). Класс BlockingCollection является поточно-ориентированным и может одновременно использоваться любым количеством потоков.

0 голосов
/ 30 апреля 2020

Если бы вы не сказали, что это был сторонний инструмент, я бы предположил, что это класс GUI. По практическим причинам очень плохая идея, чтобы несколько потоков писали в них. . NET обеспечивает строгое правило "только поток создания должен писать", начиная с 2.0 и далее .

WebServers в целом и ASP. Net в частности, используя довольно большое пул потоков. Мы говорим от 10 до 100 потоков на ядро. Это означает, что действительно трудно привязать любой запрос к определенной c теме. С тем же успехом вы можете и не пытаться.

Опять же, просмотр классов GUI может быть вашим лучшим выбором. Вы можете создать один поток с единственной целью - имитировать очередь событий GUI. Основной / пользовательский интерфейс вашего среднего приложения Windows Forms отвечает за создание каждого экземпляра класса GUI. Он поддерживается путем опроса / обработки очереди событий. Он заканчивается только x, когда получает команду отмены через очередь событий. Диспетчеризация просто помещает заказы в эту очередь, поэтому мы можем избежать проблем с многопоточностью.

...