Неблокирующая операция ввода-вывода по сравнению с использованием потоков (Насколько плохо переключение контекста?) - PullRequest
10 голосов
/ 17 ноября 2009

Мы часто используем сокеты в программе, над которой я работаю, и одновременно обрабатываем подключения от примерно 100 машин. У нас есть комбинация неблокирующих I / O , используемых для управления таблицей состояний и традиционными сокетами Java, которые используют потоки.

У нас довольно много проблем с неблокирующими сокетами, и мне лично нравится использовать потоки, чтобы обрабатывать сокеты намного лучше. Итак, мой вопрос:

Какая экономия достигается при использовании неблокирующих сокетов в одном потоке? Насколько плохо переключение контекста при использовании потоков и сколько параллельных соединений вы можете масштабировать до использования многопоточной модели в Java?

Ответы [ 3 ]

10 голосов
/ 17 ноября 2009

Ввод / вывод и выбор неблокирующих вводов / выводов зависит от профиля активности вашего сервера. Например. Если вы используете долгоживущие соединения и тысячи клиентов, ввод-вывод может стать слишком дорогим из-за исчерпания системных ресурсов. Однако прямой ввод-вывод, который не вытесняет кэш процессора, быстрее, чем неблокирующий ввод-вывод. Об этом есть хорошая статья - Написание многопоточных серверов Java - что нового - новое .

О стоимости переключения контекста - это скорее чиповая операция. Рассмотрим простой тест ниже:

package com;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class AAA {

    private static final long DURATION = TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
    private static final int THREADS_NUMBER = 2;
    private static final ThreadLocal<AtomicLong> COUNTER = new ThreadLocal<AtomicLong>() {
        @Override
        protected AtomicLong initialValue() {
            return new AtomicLong();
        }
    };
    private static final ThreadLocal<AtomicLong> DUMMY_DATA = new ThreadLocal<AtomicLong>() {
        @Override
        protected AtomicLong initialValue() {
            return new AtomicLong();
        }
    };
    private static final AtomicLong DUMMY_COUNTER = new AtomicLong();
    private static final AtomicLong END_TIME = new AtomicLong(System.nanoTime() + DURATION);

    private static final List<ThreadLocal<CharSequence>> DUMMY_SOURCE = new ArrayList<ThreadLocal<CharSequence>>();
    static {
        for (int i = 0; i < 40; ++i) {
            DUMMY_SOURCE.add(new ThreadLocal<CharSequence>());
        }
    }

    private static final Set<Long> COUNTERS = new ConcurrentSkipListSet<Long>();

    public static void main(String[] args) throws Exception {
        final CountDownLatch startLatch = new CountDownLatch(THREADS_NUMBER);
        final CountDownLatch endLatch = new CountDownLatch(THREADS_NUMBER);

        for (int i = 0; i < THREADS_NUMBER; i++) {
            new Thread() {
                @Override
                public void run() {
                    initDummyData();
                    startLatch.countDown();
                    try {
                        startLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    while (System.nanoTime() < END_TIME.get()) {
                        doJob();
                    }
                    COUNTERS.add(COUNTER.get().get());
                    DUMMY_COUNTER.addAndGet(DUMMY_DATA.get().get());
                    endLatch.countDown();
                }
            }.start();
        }
        startLatch.await();
        END_TIME.set(System.nanoTime() + DURATION);

        endLatch.await();
        printStatistics();
    }

    private static void initDummyData() {
        for (ThreadLocal<CharSequence> threadLocal : DUMMY_SOURCE) {
            threadLocal.set(getRandomString());
        }
    }

    private static CharSequence getRandomString() {
        StringBuilder result = new StringBuilder();
        Random random = new Random();
        for (int i = 0; i < 127; ++i) {
            result.append((char)random.nextInt(0xFF));
        }
        return result;
    }

    private static void doJob() {
        Random random = new Random();
        for (ThreadLocal<CharSequence> threadLocal : DUMMY_SOURCE) {
            for (int i = 0; i < threadLocal.get().length(); ++i) {
                DUMMY_DATA.get().addAndGet(threadLocal.get().charAt(i) << random.nextInt(31));
            }
        }
        COUNTER.get().incrementAndGet();
    }

    private static void printStatistics() {
        long total = 0L;
        for (Long counter : COUNTERS) {
            total += counter;
        }
        System.out.printf("Total iterations number: %d, dummy data: %d, distribution:%n", total, DUMMY_COUNTER.get());
        for (Long counter : COUNTERS) {
            System.out.printf("%f%%%n", counter * 100d / total);
        }
    }
}

Я провел четыре теста для сценариев с двумя и десятью потоками, и он показывает, что потеря производительности составляет около 2,5% (78626 итераций для двух потоков и 76754 для десяти потоков), системные ресурсы используются потоками примерно одинаково.

Также 'java.util.concurrent' авторы предполагают, что время переключения контекста составляет около 2000-4000 циклов ЦП:

public class Exchanger<V> {
   ...
   private static final int NCPU = Runtime.getRuntime().availableProcessors();
   ....
   /**
    * The number of times to spin (doing nothing except polling a
    * memory location) before blocking or giving up while waiting to
    * be fulfilled.  Should be zero on uniprocessors.  On
    * multiprocessors, this value should be large enough so that two
    * threads exchanging items as fast as possible block only when
    * one of them is stalled (due to GC or preemption), but not much
    * longer, to avoid wasting CPU resources.  Seen differently, this
    * value is a little over half the number of cycles of an average
    * context switch time on most systems.  The value here is
    * approximately the average of those across a range of tested
    * systems.
    */
   private static final int SPINS = (NCPU == 1) ? 0 : 2000; 
1 голос
/ 17 ноября 2009

Для 100 соединений вряд ли возникнут проблемы с блокировкой ввода-вывода и использованием двух потоков на соединение (один для чтения и записи). Это самая простая модель ИМХО.

Однако использование JMS может показаться вам лучшим способом управления вашими соединениями. Если вы используете что-то вроде ActiveMQ, вы можете объединить все свои соединения.

1 голос
/ 17 ноября 2009

Для ваших вопросов лучшим методом может быть создание тестовой программы, получение точных данных измерений и принятие наилучшего решения на основе этих данных. Я обычно делаю это, когда пытаюсь принимать такие решения, и это помогает иметь твердые цифры, чтобы взять с собой в обоснование своих аргументов.

Прежде чем начать, о скольких темах вы говорите? И с каким типом аппаратного обеспечения вы используете свое программное обеспечение?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...