Как использовать C # async / await в качестве автономного преобразования CPS - PullRequest
9 голосов
/ 16 мая 2019

Примечание 1: Здесь CPS обозначает "стиль передачи продолжения"

Мне было бы очень интересно понять, как подключиться к асинхронному механизму C #.В основном, как я понимаю, в асинхронной / ожидающей возможности C # компилятор выполняет преобразование CPS, а затем передает преобразованный код в контекстный объект, который управляет планированием задач в различных потоках.

Как вы думаете, возможно лииспользовать эту функцию компилятора для создания мощных комбинаторов, оставляя в стороне аспект потоков по умолчанию?

Примером может быть то, что может отменить рекурсирование и запоминание такого метода, как

async MyTask<BigInteger> Fib(int n)     // hypothetical example
{
    if (n <= 1) return n;
    return await Fib(n-1) + await Fib(n-2);
}

Мне удалосьсделать это с помощью чего-то вроде:

void Fib(int n, Action<BigInteger> Ret, Action<int, Action<BigInteger>> Rec)
{
    if (n <= 1) Ret(n);
    else Rec(n-1, x => Rec(n-2, y => Ret(x + y)));
}

(без использования асинхронной, очень хитрый ...)

или с использованием монады (While<X> = Either<X, While<X>>)

While<X> Fib(int n) => n <= 1 ?
    While.Return((BigInteger) n) :
    from x in Fib(n-1)
    from y in Fib(n-2)
    select x + y;

немного лучше, но не так симпатично, как синтаксис асинхронности:)


Я задал этот вопрос в блоге Э. Липперта , и онбыл достаточно любезен, чтобы сообщить мне, что это действительно возможно.


Необходимость для меня возникла при реализации библиотеки ZBDD: (специальный вид DAG)

  • много сложного взаиморукописные операции

  • стек постоянно переполняется на реальных примерах

  • только практично, если полностью запоминается

РучнойCPS и дерекурсизация были очень утомительными и подвержены ошибкам.


Кислотный тест для того, что мне нужно (безопасность стека), будет выглядеть примерно так:

async MyTask<BigInteger> Fib(int n, BigInteger a, BigInteger b)
{
    if (n == 0) return b;
    if (n == 1) return a;
    return await Fib(n - 1, a + b, a);
}

, который вызывает переполнение стекана Fib(10000, 1, 0) с поведением по умолчанию.Или даже лучше, используя код в начале с памяткой для вычисления Fib(10000).

Ответы [ 4 ]

1 голос
/ 17 мая 2019

Вот мой вариант решения.Это стек безопасно и не использует пул потоков, но имеет определенные ограничения.В частности, он требует хвостовой рекурсивный стиль метода, поэтому конструкции типа Fib(n-1) + Fib(n-2) не будут работать.С другой стороны, хвостовая рекурсивная природа, которая фактически выполняется итеративным образом, не требует запоминания, поскольку каждая итерация вызывается один раз.У него нет защиты крайних случаев, но это скорее прототип, чем окончательное решение:

public class RecursiveTask<T>
{
    private T _result;

    private Func<RecursiveTask<T>> _function;

    public T Result
    {
        get
        {
            var current = this;
            var last = current;

            do
            {
                last = current;
                current = current._function?.Invoke();
            } while (current != null);

            return last._result;
        }
    }

    private RecursiveTask(Func<RecursiveTask<T>> function)
    {
        _function = function;
    }

    private RecursiveTask(T result)
    {
        _result = result;
    }

    public static implicit operator RecursiveTask<T>(T result)
    {
        return new RecursiveTask<T>(result);
    }

    public static RecursiveTask<T> FromFunc(Func<RecursiveTask<T>> func) => new RecursiveTask<T>(func);
}

И использование:

class Program
{
    static RecursiveTask<int> Fib(int n, int a, int b)
    {
        if (n == 0) return a;
        if (n == 1) return b;

        return RecursiveTask<int>.FromFunc(() => Fib(n - 1, b, a + b));
    }

    static RecursiveTask<int> Factorial(int n, int a)
    {
        if (n == 0) return a;

        return RecursiveTask<int>.FromFunc(() => Factorial(n - 1, n * a));
    }


    static void Main(string[] args)
    {
        Console.WriteLine(Factorial(5, 1).Result);
        Console.WriteLine(Fib(100000, 0, 1).Result);
    }
}

Обратите внимание, что важно вернуть функцию, которая оборачивает рекуррентныйвызов, а не вызов, чтобы избежать реальной рекурсии.

ОБНОВЛЕНИЕ Ниже приведена другая реализация, которая все еще не использует преобразование CPS, но позволяет использовать семантическую близость к алгебраической рекурсии, то естьон поддерживает несколько рекурсивных вызовов внутри функции и не требует, чтобы функция была хвостовой рекурсивной.

public class RecursiveTask<T1, T2>
{
    private readonly Func<RecursiveTask<T1, T2>, T1, T2> _func;
    private readonly Dictionary<T1, RecursiveTask<T1, T2>> _allTasks;
    private readonly List<RecursiveTask<T1, T2>> _subTasks;
    private readonly RecursiveTask<T1, T2> _rootTask;
    private T1 _arg;
    private T2 _result;
    private int _runsCount;
    private bool _isCompleted;
    private bool _isEvaluating;

    private RecursiveTask(Func<RecursiveTask<T1, T2>, T1, T2> func)
    {
        _func = func;
        _allTasks = new Dictionary<T1, RecursiveTask<T1, T2>>();
        _subTasks = new List<RecursiveTask<T1, T2>>();
        _rootTask = this;
    }

    private RecursiveTask(Func<RecursiveTask<T1, T2>, T1, T2> func, T1 arg, RecursiveTask<T1, T2> rootTask) : this(func)
    {
        _arg = arg;
        _rootTask = rootTask;
    }

    public T2 Run(T1 arg)
    {
        if (!_isEvaluating)
            BuildTasks(arg);

        if (_isEvaluating)
            return EvaluateTasks(arg);

        return default;
    }

    public static RecursiveTask<T1, T2> Create(Func<RecursiveTask<T1, T2>, T1, T2> func)
    {
        return new RecursiveTask<T1, T2>(func);
    }

    private void AddSubTask(T1 arg)
    {
        if (!_allTasks.TryGetValue(arg, out RecursiveTask<T1, T2> subTask))
        {
            subTask = new RecursiveTask<T1, T2>(_func, arg, this);
            _allTasks.Add(arg, subTask);
            _subTasks.Add(subTask);
        }
    }

    private T2 Run()
    {
        if (!_isCompleted)
        {
            var runsCount = _rootTask._runsCount;
            _result = _func(_rootTask, _arg);
            _isCompleted = runsCount == _rootTask._runsCount;
        }
        return _result;
    }

    private void BuildTasks(T1 arg)
    {
        if (_runsCount++ == 0)
            _arg = arg;

        if (EqualityComparer<T1>.Default.Equals(_arg, arg))
        {
            Run();

            var processed = 0;
            var addedTasksCount = _subTasks.Count;
            while (processed < addedTasksCount)
            {
                for (var i = processed; i < addedTasksCount; i++, processed++)
                    _subTasks[i].Run();
                addedTasksCount = _subTasks.Count;
            }
            _isEvaluating = true;
        }
        else
            AddSubTask(arg);
    }

    private T2 EvaluateTasks(T1 arg)
    {
        if (EqualityComparer<T1>.Default.Equals(_arg, arg))
        {
            foreach (var task in Enumerable.Reverse(_subTasks))
                task.Run();

            return Run();
        }
        else
        {
            if (_allTasks.TryGetValue(arg, out RecursiveTask<T1, T2> task))
                return task._isCompleted ? task._result : task.Run();
            else
                return default;
        }
    }
}

Использование:

class Program
{
    static int Fib(int num)
    {
        return RecursiveTask<int, int>.Create((t, n) =>
        {
            if (n == 0) return 0;
            if (n == 1) return 1;

            return t.Run(n - 1) + t.Run(n - 2);
        }).Run(num);
    }

    static void Main(string[] args)
    {
        Console.WriteLine(Fib(7));
        Console.WriteLine(Fib(100000));
    }
}

Преимущества: он безопасен для стека, не использует пул потоков, не обременен инфраструктурой async await, используетзапоминание и позволяет использовать более или менее читабельную семантикуТекущая реализация подразумевает использование только с функциями с одним аргументом.Чтобы сделать его применимым к более широкому диапазону функций, должны быть предусмотрены аналогичные реализации для различных наборов универсальных аргументов:

RecursiveTask<T1, T2, T3>
RecursiveTask<T1, T2, T3, T4>
...
0 голосов
/ 16 мая 2019

Не глядя на ваш MyTask<T> и не просматривая трассировку стека этого исключения, невозможно узнать, что происходит.

Похоже, что вы ищете Обобщенные типы асинхронного возврата .

Вы можете просмотреть источник , чтобы увидеть, как это делается для ValueTask и ValueTask<T>.

0 голосов
/ 17 мая 2019

Решение, более близкое к тому, что я ищу, но еще не вполне удовлетворительное, заключается в следующем.Это основано на понимании предложенного GSerg решения для безопасности стека с добавленной памяткой.

Pro Ядро алгоритма (FibAux метод использует чистый синтаксис async / await).

Минусы Он все еще использует пул потоков для выполнения.

    // Core algorithm using the cute async/await syntax
    // (n.b. this would be exponential without memoization.)
    private static async Task<BigInteger> FibAux(int n)
    {
        if (n <= 1) return n;
        return await Rec(n - 1) + await Rec(n - 2);
    }

    public static Func<int, Task<BigInteger>> Rec { get; }
        = Utils.StackSafeMemoize<int, BigInteger>(FibAux);

    public static BigInteger Fib(int n)
        => FibAux(n).Result;

    [Test]
    public void Test()
    {
        Console.WriteLine(Fib(100000));
    }

    public static class Utils
    {
        // the combinator (still using the thread pool for execution)
        public static Func<X, Task<Y>> StackSafeMemoize<X, Y>(Func<X, Task<Y>> func)
        {
            var memo = new Dictionary<X, Y>();
            return x =>
            {
                Y result;
                if (!memo.TryGetValue(x, out result))
                {
                    return Task.Run(() => func(x).ContinueWith(task =>
                    {
                        var y = task.Result;
                        memo[x] = y;
                        return y;
                    }));
                }

                return Task.FromResult(result);
            };
        }
    } 

Для сравнения, это версия cps, не использующая async / await.


    public static BigInteger Fib(int n)
    {
        var fib = Memo<int, BigInteger>((m, rec, cont) =>
        {
            if (m <= 1) cont(m);
            else rec(m - 1, x => rec(m - 2, y => cont(x + y)));
        });

        return fib(n);
    }

    [Test]
    public void Test()
    {
        Console.WriteLine(Fib(100000));
    }

    // ---------

    public static Func<X, Y> Memo<X, Y>(Action<X, Action<X, Action<Y>>, Action<Y>> func)
    {
        var memo = new Dictionary<X, Y>(); // can be a Lru cache
        var stack = new Stack<Action>();

        Action<X, Action<Y>> rec = null;
        rec = (x, cont) =>
        {
            stack.Push(() =>
            {
                Y res;
                if (memo.TryGetValue(x, out res))
                {
                    cont(res);
                }
                else
                {
                    func(x, rec, y =>
                    {
                        memo[x] = y;
                        cont(y);
                    });
                }
            });
        };

        return x =>
        {
            var res = default(Y);
            rec(x, y => res = y);
            while (stack.Count > 0)
            {
                var next = stack.Pop();
                next();
            }

            return res;
        };
    }

0 голосов
/ 16 мая 2019

Кислотный тест на то, что мне нужно (безопасность стека) будет выглядеть примерно так:

async MyTask<BigInteger> Fib(int n, BigInteger a, BigInteger b)
{
    if (n == 0) return b;
    if (n == 1) return a;
    return await Fib(n - 1, a + b, a);
}

Разве это не будет просто

public static Task<BigInteger> Fib(int n, BigInteger a, BigInteger b)
{
    if (n == 0) return Task.FromResult(b);
    if (n == 1) return Task.FromResult(a);

    return Task.Run(() => Fib(n - 1, a + b, a));
}


Или, без использования пула потоков,

public static async Task<BigInteger> Fib(int n, BigInteger a, BigInteger b)
{
    if (n == 0) return b;
    if (n == 1) return a;

    return await Task.FromResult(a + b).ContinueWith(t => Fib(n - 1, t.Result, a), TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
}

, если я что-то не так понял.

...