Маршрутизация сообщений, разбиение на части и параллельная обработка - PullRequest
2 голосов
/ 16 мая 2019

Я пытаюсь решить проблему с разбиением и распараллеливанием набора входящих сообщений. Я хочу разделить на основе идентификатора пользователя, который является целым числом.

Итак, предполагая, что у меня есть сто тысяч пользователей, и я хочу, чтобы потоки или разделы X выполнялись, я хочу иметь возможность создавать потоки X с правилами разбиения, такими как userid <10000, userid> 100000 <50000 и т. Д., И иметь возможность проверить это эти разделы охватывают весь набор пользователей и что нет дубликатов (то есть нет пересечения между наборами, представленными каждым из правил разделения). </p>

Как я могу реализовать алгоритм для проверки правил разделения в C # (или любом другом языке .NET, например F #)?

Таким образом, если у меня есть 10 пользователей с идентификатором (1,2,3,4,5,6,7,8,9,10), то общее количество потоков должно иметь только 10 идентификаторов пользователей без повторяющихся идентификаторов пользователей. чем 1 поток

Спасибо за ответы, но я хочу уточнить, что я не смотрю на разделение равномерно среди всех существующих пользователей. Скорее я хочу иметь возможность создать таблицу правил разделения, которая определяет, как должно выполняться разделение. Таблица может выглядеть примерно так:

Thread 1: UserId > 0 < 100
Thread 2: UserId >= 100 < 200
Thread 3: UserId > 300

Мне нужен алгоритм, который может подтвердить, что этот набор правил является исчерпывающим (охватывают все идентификаторы пользователей) и не содержит дубликатов (идентификатор пользователя не будет назначен более чем одному потоку)

Ответы [ 2 ]

3 голосов
/ 16 мая 2019

Я думаю, что самым простым способом реализации такой логики разделения было бы разделение по модулю деления.Т.е., если у вас есть n разделов, на которые нужно распределить пользователя - просто выполните деление по модулю UserId на n.Это даст вам остаток 0 .. (n-1), и таким образом каждый UserId автоматически сопоставляется с одним из ваших n разделов (потоков).

Если UserIds распределены равномерно, эта схема разделениятакже даже будет статистически равномерно распределять всех пользователей по существующим разделам.

И эта схема распределения является по определению исчерпывающей, поскольку для такого деления по модулю может быть только остаток 0 .. (n-1)каждый UserId имеет определенный результат для такого деления по модулю.

2 голосов
/ 16 мая 2019

Приведенный ниже код разделит список сообщений на группы в зависимости от того, сколько сообщений у вас есть и сколько групп вы хотите.

private static IEnumerable<IEnumerable<Message>> GetPartitionedMessages(IEnumerable<Message> messages, int nPartitions)
{
    var orderedMessages = messages.OrderBy(x => x.UserId).ThenBy(x => x.MessageId).ToList();
    int? lastUserId = null;
    int maxPartitionSize = (int)Math.Ceiling(orderedMessages.Count / (double)nPartitions);
    var partitions = new List<List<Message>>();
    List<Message> currentPartition = null;

    foreach (var message in orderedMessages)
    {
        if (lastUserId == message.UserId)
        {
            currentPartition.Add(message);
        }
        else
        {
            lastUserId = message.UserId;
            if (currentPartition == null || currentPartition.Count >= maxPartitionSize)
            {
                currentPartition = new List<Message>();
                partitions.Add(currentPartition);
            }

            currentPartition.Add(message);
        }
    }

    return partitions;
}

В этом примере приложение использует эту функцию, а затем выполняет фиктивную функцию для каждого сообщения. Каждая группа (то есть список, который будет обрабатываться каждым потоком) никогда не будет содержать идентификатор пользователя из другого списка. Пример приложения выводит на консоль сообщение, содержащее информацию о том, какой поток обрабатывает какое сообщение, а также идентификатор и идентификатор пользователя каждого сообщения.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace ConsoleApp16
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("User IDs 1, 2, 3");
            ProcessMessages(GetTestMessages(1, 2, 3), 4);

            Console.WriteLine("User IDs empty");
            ProcessMessages(GetTestMessages(), 4);

            Console.WriteLine("User IDs 1, 2, 3, 4, 5, 6, 7, 8, 9, 10");
            ProcessMessages(GetTestMessages(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 4);

            Console.WriteLine("User IDs 2, 2, 2, 1, 1, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 6, 7, 8, 9, 10");
            ProcessMessages(GetTestMessages(2, 2, 2, 1, 1, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 4, 4, 6, 7, 8, 9, 10), 4);

            Console.ReadLine();
        }

        private static IEnumerable<Message> GetTestMessages(params int[] userIds)
        {
            int i = 1;
            foreach (var userId in userIds)
                yield return new Message { MessageId = i++, UserId = userId };
        }

        private class Message
        {
            public int MessageId { get; set; }
            public int UserId { get; set; }
            //... Real message properties
        }

        private static void ProcessMessages(IEnumerable<Message> incomingMessages, int nThreads)
        {
            var tasks = GetPartitionedMessages(incomingMessages, nThreads)
                         .Select((messages, i) => Task.Run(() => DoMessageBusinessLogic(messages, i)))
                         .ToArray();
            Task.WaitAll(tasks);
        }

        private static void DoMessageBusinessLogic(IEnumerable<Message> messages, int threadIdx)
        {
            foreach (var message in messages)
                Console.WriteLine($"Thread ID: {threadIdx}, MsgId: {message.MessageId}, UserId: {message.UserId}");
        }

        private static IEnumerable<IEnumerable<Message>> GetPartitionedMessages(IEnumerable<Message> messages, int nPartitions)
        {
            var orderedMessages = messages.OrderBy(x => x.UserId).ThenBy(x => x.MessageId).ToList();
            int? lastUserId = null;
            int maxPartitionSize = (int)Math.Ceiling(orderedMessages.Count / (double)nPartitions);
            var partitions = new List<List<Message>>();
            List<Message> currentPartition = null;

            foreach (var message in orderedMessages)
            {
                if (lastUserId == message.UserId)
                {
                    currentPartition.Add(message);
                }
                else
                {
                    lastUserId = message.UserId;
                    if (currentPartition == null || currentPartition.Count >= maxPartitionSize)
                    {
                        currentPartition = new List<Message>();
                        partitions.Add(currentPartition);
                    }

                    currentPartition.Add(message);
                }
            }

            return partitions;
        }
    }
}
...