Как отправить асинхронную функцию C # на конкретный ThreadPool. (Пример Kotlin предоставлен) - PullRequest
0 голосов
/ 18 октября 2019

Мне нужно перевести этот код Kotlin на C #, используя async / await. Особенно мне интересно, как я могу создать пул потоков и выполнить асинхронный метод для него.

val myThreadPool1 = newFixedThreadPoolContext(1, "myThreadPool1")
val myThreadPool2 = newFixedThreadPoolContext(1, "myThreadPool2")

fun main() = runBlocking {
    f1()
    f2()
}

suspend fun f1() {
    withContext(myThreadPool1) {
        delay(2000)
        println(Thread.currentThread().name)
    }
}

suspend fun f2() {
    withContext(myThreadPool2) {
        delay(1000)
        println(Thread.currentThread().name)
    }
}

1 Ответ

1 голос
/ 18 октября 2019

Я сделал пример для вас / любого человека о том, как использовать SynchronizationContext для продолжения ожидаемых методов в определенном потоке. Я не знаю вашего опыта работы с C #, но это не на уровне начинающих.

В этом примере вы можете увидеть два асинхронных метода, которые "перебивают" друг друга.

Я создал сутьдля этого (что немного отличается) : ASyncThread.cs

Вы можете использовать это как ссылку ..

class Program
{
    static async void First()
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine($"{DateTime.Now}| First on Thread: {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(1000);
        }
    }

    static async void Second()
    {
        for (int i = 0; i < 10; i++)
        {
            Console.WriteLine($"{DateTime.Now}| Second on Thread: {Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(500);
        }
    }

    static void Main(string[] args)
    {
        Console.WriteLine($"Hello World! on Thread: {Thread.CurrentThread.ManagedThreadId}");

        using (var myThread = new AwaitEnabledThread())
        {
            myThread.Post(First);
            myThread.Post(Second);

            Console.ReadLine();
        }
    }
}

Что приведет к:

Hello World! on thread: 1                           
18-10-2019 12:14:34| First on Thread: 4             
18-10-2019 12:14:34| Second on Thread: 4            
18-10-2019 12:14:35| Second on Thread: 4            
18-10-2019 12:14:35| First on Thread: 4             
18-10-2019 12:14:36| Second on Thread: 4            
18-10-2019 12:14:36| Second on Thread: 4            
18-10-2019 12:14:37| First on Thread: 4             
18-10-2019 12:14:37| Second on Thread: 4            
18-10-2019 12:14:37| Second on Thread: 4            
18-10-2019 12:14:38| First on Thread: 4             
18-10-2019 12:14:38| Second on Thread: 4            
18-10-2019 12:14:38| Second on Thread: 4            
18-10-2019 12:14:39| Second on Thread: 4            
18-10-2019 12:14:39| First on Thread: 4             

public class AwaitEnabledThread : SynchronizationContext, IDisposable
{
    // By JvanLangen.
    private class ActionWithState
    {
        public SendOrPostCallback Action { get; set; }
        public object State { get; set; }
    }

    private Task _mainTask;
    private int _mainTaskThreadId;
    private readonly ManualResetEvent _terminate = new ManualResetEvent(false);
    private readonly AutoResetEvent _actionAdded = new AutoResetEvent(false);
    private readonly ConcurrentQueue<ActionWithState> _actions = new ConcurrentQueue<ActionWithState>();

    private void TaskMethod()
    {
        // because this class derives from SynchronizationContext
        SynchronizationContext.SetSynchronizationContext(this); // <-------

        _mainTaskThreadId = Thread.CurrentThread.ManagedThreadId;

        var waitHandles = new WaitHandle[] { _terminate, _actionAdded };

        while (WaitHandle.WaitAny(waitHandles) != 0)
            while (_actions.TryDequeue(out var actionWithState))
                actionWithState.Action(actionWithState.State);
    }


    public AwaitEnabledThread()
    {
        _mainTask = Task.Factory.StartNew(TaskMethod, TaskCreationOptions.LongRunning);
    }

    public override void Post(SendOrPostCallback d, object state = null)
    {
        _actions.Enqueue(new ActionWithState { Action = d, State = state });
        _actionAdded.Set();
    }

    public void Post(Action action)
    {
        Post(s => action(), null);
    }

    public override void Send(SendOrPostCallback d, object state = null)
    {
        if (Thread.CurrentThread.ManagedThreadId != _mainTaskThreadId)
        {
            _actions.Enqueue(new ActionWithState { Action = d, State = state });
            _actionAdded.Set();
        }
        else
            d(state);
    }

    public void Send(Action action)
    {
        Send(s => action(), null);
    }

    public void Dispose()
    {
        _terminate.Set();
        _mainTask.Wait();
    }

    public bool Terminated => _terminate.WaitOne(0);
}

Обновление: Вы даже можете ожидать действия, поставленного в очередь в потоке:

public static Task SendASync(this AwaitEnabledThread thread, SendOrPostCallback d, object state = null)
{
    var tcs = new TaskCompletionSource<object>();

    thread.Post(s =>
    {
        try
        {
            // execute the delegate
            d(state);
            // return to the previous SynchronizationContext
            tcs.SetResult(null);
        }
        catch (Exception exception)
        {
            // return to the previous SynchronizationContext
            tcs.SetException(exception);
        }
    }, tcs);

    return tcs.Task;
}

Использование:

using (var myThread = new AwaitEnabledThread())
{
    await myThread.SendASync(First);

    // even awaitable actions can be awaited
    await myThread.SendASync(async s =>
    {
        await DoSomethingIO();
        await DoSomethingIOAgain();
    });


    Console.ReadLine();
}

Разница в том, что первое ожидание влияет на текущий поток. Ожидание в методе выполняется в целевом потоке.

...