Импорт данных через API для тысяч пользователей с использованием потоков - PullRequest
4 голосов
/ 14 сентября 2010

В нашем приложении нам нужно импортировать данные транзакции из PayPal через API для пользователей моего приложения и хранить в базе данных. У меня тысячи (около 5 тыс.) Пользователей, и оно растет день ото дня.

Это приложение является службой Windows .net.

Импортирует данные ежечасно для всех пользователей. В настоящее время мы импортируем данные для пользователей один пользователь за другим, но иногда то, что происходит, данные одного пользователя могут быть настолько большими, что для получения всех его данных требуется около 5 часов, поэтому мы блокируем других пользователей до завершения импорта этих пользовательских данных. Этот почасовой импорт для всех остальных пользователей полностью ушел на жеребьевку.

Чтобы избежать этого, мы думали о создании потоков для каждого импорта пользователей и запускали их каждый час, используя службу Windows. Здесь мы имеем ситуацию, когда нам нужно думать о пропускной способности в любой момент времени, так как все потоки будут запускаться одновременно. Это вообще проблема?

Теперь я хочу знать, верна ли наша новая реализация или нет? Также я хочу знать, как это обычно делается? Если кто-то сталкивался с такой функциональностью, пожалуйста, сообщите нам, как это делается.

Если мой вопрос недостаточно ясен, пожалуйста, дайте мне знать, я предоставлю дополнительную информацию.

Редактировать : Если я отправляю так много запросов в Paypal с одного IP, как он обрабатывает это? Есть идеи, ограничивает ли количество запросов на IP?

Обновление: Спасибо за все предложения и отзывы.

Я думал об использовании решения jgauffin, так как оно было идеальной имитацией ThreadPool. Но здесь мне нужны еще некоторые функции, такие как динамическое изменение ограничения потока и рекурсивный вызов метода обратного вызова.

После долгих исследований и анализа пула потоков я решил использовать SmartThreadPool , который сделан на основе логики пула потоков, но с большим количеством функций. Это довольно хорошо и отлично служит моей цели.

Ответы [ 5 ]

2 голосов
/ 14 сентября 2010

Я бы начал с пула потоков (скажем, 10) и позволил каждому потоку выполнить импорт.Когда закончите, он возьмет следующий элемент из очереди.Вы используете существующий класс ThreadPool и ставите все свои запросы на импорт в этот пул потоков.Вы можете контролировать максимальное количество потоков для этого ThreadPool.

Создание тысяч потоков является плохой идеей по нескольким причинам, раньше это было слишком много для ОС Windows, и, как вы указываете сами, вы можете затопить сеть (или, возможно, службу PayPal).

Для максимальной масштабируемости вы можете выполнять асинхронный ввод-вывод, который не блокирует поток во время выполнения запроса, но у этого API есть крутая кривая обучения, которая, вероятно, не нужна для вашего сценария.

2 голосов
/ 14 сентября 2010

Не использовать поток на пользователя. Поместите РАБОЧИЙ ПУНКТ в пул потоков для каждого пользователя. Таким образом, вы получаете лучшее из обоих миров - не накладные расходы памяти на 5000 потоков, а больше контроля загрузки, потому что вы можете определить, сколько потоков ThreadPool использует для обработки рабочих элементов.

1 голос
/ 14 сентября 2010

Я бы не стал создавать темы для каждого пользователя.Этот подход не очень масштабируемый.И я предполагаю, что API не имеет механизма для асинхронной загрузки.Если это произойдет, то это, вероятно, путь.

Шаблон «производитель-потребитель» может хорошо работать здесь.Идея состоит в том, чтобы создать пул с фиксированным размером потоков, которые потребляют рабочие элементы из общей очереди.Вероятно, лучше избегать ThreadPool в вашем случае, потому что он предназначен в основном для краткосрочных задач.Вы не хотите, чтобы ваши долгоживущие задачи исчерпали его, потому что он используется для множества других вещей в .NET BCL.

Если вы используете .NET 4.0, вы можете воспользоваться BlockingCollection .Существует также бэкпорт, доступный как часть Reactive Extensions .Вот как может выглядеть ваш код.

Примечание. Вам придется укрепить код, чтобы сделать его более надежным, корректно завершить работу и т. Д.

public class Importer
{
  private BlockingCollection<Person> m_Queue = new BlockingCollection<Person>();

  public Importer(int poolSize)
  {
    for (int i = 0; i < poolSize; i++)
    {
      var thread = new Thread(Download);
      thread.IsBackground = true;
      thread.Start();
    }
  }

  public void Add(Person person)
  {
    m_Queue.Add(person);
  }

  private void Download()
  {
    while (true)
    {
      Person person = m_Queue.Take();
      // Add your code for downloading this person's data here.
    }
  }
}
1 голос
/ 14 сентября 2010

Я бы использовал очередь и скажем пять потоков для этого.Каждый раз, когда поток завершается, он получает нового пользователя из очереди.

Пример кода:

public class Example
{

    public static void Main(string[] argv)
    {
        //setup
        DownloadQueue personQueue = new DownloadQueue();
        personQueue.JobTriggered += OnHandlePerson;
        personQueue.ThreadLimit = 10; //can be changed at any time and will be adjusted when a job completed (or a new one is enqueued)

        // enqueue as many persons as you like
        personQueue.Enqueue(new Person());

        Console.ReadLine();
    }

    public static void OnHandlePerson(object source, PersonEventArgs e)
    {
        //download persno here.
    }
}

public class DownloadQueue
{
    Queue<Person> _queue = new Queue<Person>();
    int _runningThreads = 0;

    public int ThreadLimit { get; set; }

    /// <summary>
    /// Enqueue a new user.
    /// </summary>
    /// <param name="person"></param>
    public void Enqueue(Person person)
    {
        lock (_queue)
        {
            _queue.Enqueue(person);
            if (_runningThreads < ThreadLimit)
                ThreadPool.QueueUserWorkItem(DownloadUser);
        }
    }

    /// <summary>
    /// Running using a ThreadPool thread.
    /// </summary>
    /// <param name="state"></param>
    private void DownloadUser(object state)
    {
        lock (_queue)
            ++_runningThreads;

        while (true)
        {
            Person person;
            lock (_queue)
            {
                if (_queue.Count == 0)
                {
                    --_runningThreads;
                    return; // nothing more in the queue. Lets exit
                }
                person = _queue.Dequeue();
            }

            JobTriggered(this, new PersonEventArgs(person));
        }
    }

    public event EventHandler<PersonEventArgs> JobTriggered = delegate { };
}


public class PersonEventArgs : EventArgs
{
    Person _person;

    public PersonEventArgs(Person person)
    {
        _person = person;
    }

    public Person Person { get { return _person; } }
}
public class Person
{
    public Person(string fName, string lName)
    {
        this.firstName = fName;
        this.lastName = lName;
    }

    public string firstName;
    public string lastName;
}
1 голос
/ 14 сентября 2010

Создание 5000 потоков в коде - нехорошая вещь, это может очень сильно замедлить работу сервера, даже если это может привести к сбою.

Здесь вам нужна балансировка нагрузки.

попробуйте подумать о решении на основе MSMQ, если вы используете .net plateform и задаете запросы пользователей ququ, и тогда должен быть какой-то диспетчер, который будет распределять запросы пользователей между серверами.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...