Как реализовать ограниченную перетасовку последовательности - PullRequest
4 голосов
/ 09 июля 2019

Мне нужно было смоделировать вывод многопоточного сценария, где несколько потоков обрабатывают параллельно упорядоченную последовательность. Выходные данные больше не упорядочены, но не полностью перетасованы. Я подумал, что такая перестановка должна быть тривиальной и не займет больше 10-20 минут. Но это оказалось намного хитрее, чем я. Так что теперь, после многих часов борьбы с проблемой и уточнения требований, мне удалось создать сложную реализацию с неоптимальным статистическим поведением. Давайте начнем с формулировки требований:

1) Метод должен возвращать отложенное значение IEnumerable, чтобы можно было перетасовать последовательности бесконечной длины.
2) Должен быть жесткий верхний предел при случайном смещении каждого отдельного элемента.
3) Распределение смещений должно быть примерно плоским. Например, последовательность из 100 элементов, перемешанных с maxDisplacement = 2, должна иметь ~ 20 элементов, смещенных на -2, ~ 20 на -1, ~ 20 не смещенных, ~ 20 на +1 и ~ 20 на + 2.
4) Тасование должно быть случайным. Разные вызовы метода обычно должны возвращать разную перемешанную последовательность.

Пример ввода и вывода. Последовательность из 20 элементов перетасовывается с maxDisplacement = 5.

Ввод: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
Выход: 0, 3, 2, 5, 7, 1, 4, 6, 8, 12, 9, 11, 13, 10, 15, 16, 19, 14, 17, 18

Ниже моя лучшая попытка:

public static IEnumerable<TSource> ConstrainedShuffle<TSource>(
    this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
    if (maxDisplacement < 1)
        throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
    random = random ?? new Random();
    var buffer = new SortedDictionary<int, TSource>();

    IEnumerable<(int Index, int BufferMaxIndex)> EnumerateInternal()
    {
        int index = -1;
        int bufferMaxIndex = -1;
        foreach (var item in source)
        {
            bufferMaxIndex++;
            buffer.Add(bufferMaxIndex, item);
            if (bufferMaxIndex >= maxDisplacement)
            {
                // Start yielding when buffer has maxDisplacement + 1 elements
                index++;
                yield return (index, bufferMaxIndex);
            }
        }
        while (buffer.Count > 0) // Yield what is left in the buffer
        {
            while (!buffer.ContainsKey(bufferMaxIndex)) bufferMaxIndex--;
            index++;
            yield return (index, bufferMaxIndex);
        }
    }

    foreach (var (index, bufferMaxIndex) in EnumerateInternal())
    {
        int bufferMinIndex = buffer.First().Key;
        int selectedKey;
        if (index - bufferMinIndex >= maxDisplacement)
        {
            // Forced picking of the earliest element
            selectedKey = bufferMinIndex;
        }
        else
        {
            // Pick an element randomly (favoring earlier elements)
            int bufferRange = bufferMaxIndex - bufferMinIndex + 1;
            while (true)
            {
                var biasedRandom = Math.Pow(random.NextDouble(), 2.0);
                var randomIndex = (int)(biasedRandom * bufferRange);
                selectedKey = bufferMinIndex + randomIndex;
                if (buffer.ContainsKey(selectedKey)) break;
            }
        }
        yield return buffer[selectedKey];
        buffer.Remove(selectedKey);
    }
}

Эта реализация не соответствует 3-му требованию. Распределение смещений представляет собой странную кривую с пиком при максимальном положительном смещении (сильно преувеличено для больших значений maxDisplacement). Вот распределение последовательности из 1 000 000 элементов, перемешанное с maxDisplacement = 10:

-10: 44,188
 -9: 44,199
 -8: 43,701
 -7: 43,360
 -6: 43,134
 -5: 43,112
 -4: 42,870
 -3: 43,628
 -2: 44,170
 -1: 45,479
  0: 50,029
 +1: 58,611
 +2: 67,077
 +3: 71,663
 +4: 70,175
 +5: 62,914
 +6: 52,835
 +7: 40,974
 +8: 30,553
 +9: 21,210
+10: 36,118

Отрицательные / Положительные смещения: 437,841 / 512,130

Возможно, мне не хватает более простого решения этой проблемы.


Обновление: Я реализовал решение на основе Джима Мишеля , и оно отлично работает! Перестановка симметрична относительно положительных и отрицательных смещений, в точках, где соединены перемешанные фрагменты, нет видимых швов, а распределение смещений почти плоское (небольшие смещения слегка приветствуются, но я в этом согласен). Это также очень быстро.

public static IEnumerable<TSource> ConstrainedShuffle_Probabilistic<TSource>(
    this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
    if (maxDisplacement < 1)
        throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
    random = random ?? new Random();
    int chunkSize = Math.Max(100, maxDisplacement);
    int seamSize = maxDisplacement;
    int chunkSizePlus = chunkSize + 2 * seamSize;
    var indexes = new List<int>(chunkSizePlus);
    var chunk = new List<TSource>(chunkSizePlus + seamSize);
    int chunkOffset = 0;
    int indexesOffset = 0;
    bool firstShuffle = true;
    int index = -1;
    foreach (var item in source)
    {
        index++;
        chunk.Add(item);
        indexes.Add(index);
        if (indexes.Count >= chunkSizePlus)
        {
            if (firstShuffle)
            {
                ShuffleIndexes(0, indexes.Count - seamSize);
            }
            else
            {
                ShuffleIndexes(seamSize, seamSize + chunkSize);
            }
            for (int i = 0; i < chunkSize; i++)
            {
                yield return chunk[indexes[i] - chunkOffset];
            }
            if (!firstShuffle)
            {
                chunk.RemoveRange(0, chunkSize);
                chunkOffset += chunkSize;
            }
            indexes.RemoveRange(0, chunkSize);
            indexesOffset += chunkSize;
            firstShuffle = false;
        }
    }
    if (firstShuffle)
    {
        ShuffleIndexes(0, indexes.Count);
    }
    else
    {
        ShuffleIndexes(seamSize, indexes.Count);
    }
    for (int i = 0; i < indexes.Count; i++)
    {
        yield return chunk[indexes[i] - chunkOffset];
    }

    void ShuffleIndexes(int suffleFrom, int suffleToExclusive)
    {
        var range = Enumerable
            .Range(suffleFrom, suffleToExclusive - suffleFrom).ToList();
        Shuffle(range);
        foreach (var i in range)
        {
            int index1 = indexes[i];
            int randomFrom = Math.Max(0, index1 - indexesOffset - maxDisplacement);
            int randomToExclusive = Math.Min(indexes.Count,
                index1 - indexesOffset + maxDisplacement + 1);
            int selectedIndex;
            int collisions = 0;
            while (true)
            {
                selectedIndex = random.Next(randomFrom, randomToExclusive);
                int index2 = indexes[selectedIndex];
                if (Math.Abs(i + indexesOffset - index2) <= maxDisplacement) break;
                collisions++;
                if (collisions >= 20) // Average collisions is < 1
                {
                    selectedIndex = -1;
                    break;
                }
            }
            if (selectedIndex != i && selectedIndex != -1)
            {
                var temp = indexes[i];
                indexes[i] = indexes[selectedIndex];
                indexes[selectedIndex] = temp;
            }
        }
    }

    void Shuffle(List<int> list)
    {
        for (int i = 0; i < list.Count; i++)
        {
            int j = random.Next(i, list.Count);
            if (i == j) continue;
            var temp = list[i];
            list[i] = list[j];
            list[j] = temp;
        }
    }
}

Пример распределения перемещений. Последовательность из 1 000 000 элементов перетасовывается с maxDisplacement = 1000, а затем смещения группируются и отображаются средние значения:

[-1000..-801]: 443
 [-800..-601]: 466
 [-600..-401]: 496
 [-400..-201]: 525
   [-200..-1]: 553
          [0]: 542
   [+1..+200]: 563
 [+201..+400]: 546
 [+401..+600]: 514
 [+601..+800]: 475
[+801..+1000]: 418

Время выполнения: 450 мсек

Ответы [ 2 ]

1 голос
/ 10 июля 2019

Идея: предварительно вычислить все возможные параметры для буфера размером n?например, для -1,0, + 1 и буфера 3 вы получите [0,1,2], [0,2,1], [1,0,2], [1,0,3 carry 2] в предположениине перенести вперед из ранее.Итак ...

 [0,1,2]         has a total shift of 0
 [0,2,1]         has a total shift of 2
 [1,0,2]         has a total shift of 2
 [1,0,3] carry 2 has a total shift of 3

Сделайте то же самое для случая, когда у вас есть перенос вперед (у вас есть два состояния, в которых вы можете находиться в начале, одно без переноса вперед, одно с переносом впереди перенос должен приземлиться в первой ячейке в этом упрощенном случае).

Так что теперь вы можете назначить вероятности для каждого шаблона, чтобы соответствовать плоскому распределению, и можете соответственно выбрать один случайным образом.Это выводит все N следующих значений, а затем вы берете перенос и начинаете снова.

Очевидно, что для чего-то большего, чем -1,0,1, у вас будет гораздо больше возможностей, и у вас также потенциально будет больше предметов для переноса..

Теперь, вы можете упростить это?Может быть, поместить выбор в ориентированный граф, используя относительные смещения?Зациклить график, когда он повторяется.Назначьте вероятности каждой ветви, чтобы получить равномерное распределение.Возможно, превратите его в конечный автомат.

Бонусные очки: создайте конечный автомат, который реализует алгоритм, но не знает вероятностей для переходов.Теперь используйте машинное обучение, чтобы обучить его получению равномерного распределения, присваивая вероятности переходам.

Я бы сделал этот комментарий, так как это не прямой ответ, но он слишком длинный ;

1 голос
/ 09 июля 2019

У меня есть идея, которая должна работать на конечном массиве.

При максимальном смещении 2:

  • индекс 0 можно переместить в индекс 1 или 2
  • индекс 1 можно переместить в индекс 0, 2 или 3
  • индекс 2 можно переместить в индекс 0, 1, 3 или 4
  • и т.д.
  • Индекс 8 может быть перемещен на 6, 7 или 9
  • Индекс 9 может быть перемещен в 7 или 8

Так вот моя идея. Давайте использовать массив из 10 элементов:

working = [0,1,2,3,4,5,6,7,8,9]
available = [0,1,2,3,4,5,6,7,8,9]  // make a copy of the initial array
avail_count = 10

Теперь выполните следующие действия, пока не выйдет <2: </p>.

  1. Выберите элемент случайным образом из массива available.
  2. Выберите случайное число от -2 до +2 включительно (кроме особых случаев 0, 1, 8 и 9, где ваш диапазон ограничен).
  3. Добавьте смещение к выбранному вами номеру. Это дает вам индекс, с которым вы будете менять элемент, выбранный на шаге 1. (Это не всегда работает, см. Ниже.)
  4. Поменяйте местами эти два элемента в массиве working.
  5. В массиве available удалите два замененных элемента, заменив их последним и уменьшив количество.

Позвольте мне проиллюстрировать это примером.

  1. Выберите случайное число от 0 до 9 включительно и извлеките этот элемент из массива available. Скажем, случайное число равно 5. Элемент в available[5] равен 5.
  2. Выберите случайное смещение. Скажем, вы выбрали -2.
  3. Добавьте -2 к 5, в результате чего 3: индекс для обмена.
  4. Поменяйте местами эти два элемента, в результате чего: working = [0,1,2,5,4,3,6,7,8,9]

Шаг 5, удалите 3 и 5 из массива available и соответственно уменьшите счет:

available = [0,1,2,3,4,9,6,7,8]  count = 9
available = [0,1,2,8,4,9,6,7]    count = 8

Следующая итерация проиллюстрирует проблему, о которой я говорил в шаге № 3.

  1. Выберите случайное число от 0 до 7 включительно. Скажем, мы выбрали 2. Предмет есть 2.
  2. Выберите случайное смещение. Скажем, мы выбрали 1.
  3. Добавьте 1 к 2, давая 3. Теперь у нас есть проблема. Элемент на working[3] равен 5. Мы не можем поменять 2 с 5, потому что doiong так приведет к смещению 3, которое выше вашего заявленного максимального смещения.

Я могу придумать два способа решения этой проблемы. Первое легко. Если элемент в working[index] не равен index, то предположим, что вы не можете поменяться местами: обработайте его так, как если бы случайное смещение равнялось 0. Просто удалите первый индекс из массива available, уменьшите счет и продолжай.

Другой способ - построить массив всех подходящих предметов в диапазоне -max_displacement..+max_displacement и случайным образом выбрать один. Недостатком является то, что O (max_displacement * 2), но будет работать.

В любом случае, если вы продолжите это до count < 2, то вы будете перетасовывать массив, сохраняя ваше правило смещения. Даст ли это вам распределение перемещений, которое вы хотите - другой вопрос. Я должен был бы закодировать это и дать ему водоворот, чтобы определить это.

Теперь, как это работает в потоке? Моей первой попыткой было бы сделать это большими кусками. Надо бы больше об этом подумать.

...