Мне нужно было смоделировать вывод многопоточного сценария, где несколько потоков обрабатывают параллельно упорядоченную последовательность. Выходные данные больше не упорядочены, но не полностью перетасованы. Я подумал, что такая перестановка должна быть тривиальной и не займет больше 10-20 минут. Но это оказалось намного хитрее, чем я. Так что теперь, после многих часов борьбы с проблемой и уточнения требований, мне удалось создать сложную реализацию с неоптимальным статистическим поведением. Давайте начнем с формулировки требований:
1) Метод должен возвращать отложенное значение IEnumerable
, чтобы можно было перетасовать последовательности бесконечной длины.
2) Должен быть жесткий верхний предел при случайном смещении каждого отдельного элемента.
3) Распределение смещений должно быть примерно плоским. Например, последовательность из 100 элементов, перемешанных с maxDisplacement = 2
, должна иметь ~ 20 элементов, смещенных на -2, ~ 20 на -1, ~ 20 не смещенных, ~ 20 на +1 и ~ 20 на + 2.
4) Тасование должно быть случайным. Разные вызовы метода обычно должны возвращать разную перемешанную последовательность.
Пример ввода и вывода. Последовательность из 20 элементов перетасовывается с maxDisplacement = 5
.
Ввод: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
Выход: 0, 3, 2, 5, 7, 1, 4, 6, 8, 12, 9, 11, 13, 10, 15, 16, 19, 14, 17, 18
Ниже моя лучшая попытка:
public static IEnumerable<TSource> ConstrainedShuffle<TSource>(
this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
if (maxDisplacement < 1)
throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
random = random ?? new Random();
var buffer = new SortedDictionary<int, TSource>();
IEnumerable<(int Index, int BufferMaxIndex)> EnumerateInternal()
{
int index = -1;
int bufferMaxIndex = -1;
foreach (var item in source)
{
bufferMaxIndex++;
buffer.Add(bufferMaxIndex, item);
if (bufferMaxIndex >= maxDisplacement)
{
// Start yielding when buffer has maxDisplacement + 1 elements
index++;
yield return (index, bufferMaxIndex);
}
}
while (buffer.Count > 0) // Yield what is left in the buffer
{
while (!buffer.ContainsKey(bufferMaxIndex)) bufferMaxIndex--;
index++;
yield return (index, bufferMaxIndex);
}
}
foreach (var (index, bufferMaxIndex) in EnumerateInternal())
{
int bufferMinIndex = buffer.First().Key;
int selectedKey;
if (index - bufferMinIndex >= maxDisplacement)
{
// Forced picking of the earliest element
selectedKey = bufferMinIndex;
}
else
{
// Pick an element randomly (favoring earlier elements)
int bufferRange = bufferMaxIndex - bufferMinIndex + 1;
while (true)
{
var biasedRandom = Math.Pow(random.NextDouble(), 2.0);
var randomIndex = (int)(biasedRandom * bufferRange);
selectedKey = bufferMinIndex + randomIndex;
if (buffer.ContainsKey(selectedKey)) break;
}
}
yield return buffer[selectedKey];
buffer.Remove(selectedKey);
}
}
Эта реализация не соответствует 3-му требованию. Распределение смещений представляет собой странную кривую с пиком при максимальном положительном смещении (сильно преувеличено для больших значений maxDisplacement
). Вот распределение последовательности из 1 000 000 элементов, перемешанное с maxDisplacement = 10
:
-10: 44,188
-9: 44,199
-8: 43,701
-7: 43,360
-6: 43,134
-5: 43,112
-4: 42,870
-3: 43,628
-2: 44,170
-1: 45,479
0: 50,029
+1: 58,611
+2: 67,077
+3: 71,663
+4: 70,175
+5: 62,914
+6: 52,835
+7: 40,974
+8: 30,553
+9: 21,210
+10: 36,118
Отрицательные / Положительные смещения: 437,841 / 512,130
Возможно, мне не хватает более простого решения этой проблемы.
Обновление: Я реализовал решение на основе Джима Мишеля , и оно отлично работает! Перестановка симметрична относительно положительных и отрицательных смещений, в точках, где соединены перемешанные фрагменты, нет видимых швов, а распределение смещений почти плоское (небольшие смещения слегка приветствуются, но я в этом согласен). Это также очень быстро.
public static IEnumerable<TSource> ConstrainedShuffle_Probabilistic<TSource>(
this IEnumerable<TSource> source, Random random, int maxDisplacement)
{
if (maxDisplacement < 1)
throw new ArgumentOutOfRangeException(nameof(maxDisplacement));
random = random ?? new Random();
int chunkSize = Math.Max(100, maxDisplacement);
int seamSize = maxDisplacement;
int chunkSizePlus = chunkSize + 2 * seamSize;
var indexes = new List<int>(chunkSizePlus);
var chunk = new List<TSource>(chunkSizePlus + seamSize);
int chunkOffset = 0;
int indexesOffset = 0;
bool firstShuffle = true;
int index = -1;
foreach (var item in source)
{
index++;
chunk.Add(item);
indexes.Add(index);
if (indexes.Count >= chunkSizePlus)
{
if (firstShuffle)
{
ShuffleIndexes(0, indexes.Count - seamSize);
}
else
{
ShuffleIndexes(seamSize, seamSize + chunkSize);
}
for (int i = 0; i < chunkSize; i++)
{
yield return chunk[indexes[i] - chunkOffset];
}
if (!firstShuffle)
{
chunk.RemoveRange(0, chunkSize);
chunkOffset += chunkSize;
}
indexes.RemoveRange(0, chunkSize);
indexesOffset += chunkSize;
firstShuffle = false;
}
}
if (firstShuffle)
{
ShuffleIndexes(0, indexes.Count);
}
else
{
ShuffleIndexes(seamSize, indexes.Count);
}
for (int i = 0; i < indexes.Count; i++)
{
yield return chunk[indexes[i] - chunkOffset];
}
void ShuffleIndexes(int suffleFrom, int suffleToExclusive)
{
var range = Enumerable
.Range(suffleFrom, suffleToExclusive - suffleFrom).ToList();
Shuffle(range);
foreach (var i in range)
{
int index1 = indexes[i];
int randomFrom = Math.Max(0, index1 - indexesOffset - maxDisplacement);
int randomToExclusive = Math.Min(indexes.Count,
index1 - indexesOffset + maxDisplacement + 1);
int selectedIndex;
int collisions = 0;
while (true)
{
selectedIndex = random.Next(randomFrom, randomToExclusive);
int index2 = indexes[selectedIndex];
if (Math.Abs(i + indexesOffset - index2) <= maxDisplacement) break;
collisions++;
if (collisions >= 20) // Average collisions is < 1
{
selectedIndex = -1;
break;
}
}
if (selectedIndex != i && selectedIndex != -1)
{
var temp = indexes[i];
indexes[i] = indexes[selectedIndex];
indexes[selectedIndex] = temp;
}
}
}
void Shuffle(List<int> list)
{
for (int i = 0; i < list.Count; i++)
{
int j = random.Next(i, list.Count);
if (i == j) continue;
var temp = list[i];
list[i] = list[j];
list[j] = temp;
}
}
}
Пример распределения перемещений. Последовательность из 1 000 000 элементов перетасовывается с maxDisplacement = 1000
, а затем смещения группируются и отображаются средние значения:
[-1000..-801]: 443
[-800..-601]: 466
[-600..-401]: 496
[-400..-201]: 525
[-200..-1]: 553
[0]: 542
[+1..+200]: 563
[+201..+400]: 546
[+401..+600]: 514
[+601..+800]: 475
[+801..+1000]: 418
Время выполнения: 450 мсек