Как работает схема разрушения LMAX? - PullRequest
202 голосов
/ 03 июля 2011

Я пытаюсь понять схему разрушения .Я посмотрел видео InfoQ и попытался прочитать их статью.Я понимаю, что задействован кольцевой буфер, который инициализируется как чрезвычайно большой массив, чтобы использовать преимущества локальности кэша и исключить выделение новой памяти.

Звучит так, как будто есть одно или несколько атомных целых чисел, которые отслеживают позиции.Кажется, что каждое «событие» получает уникальный идентификатор, и его положение в кольце определяется путем нахождения его модуля относительно размера кольца и т. Д. И т. Д.

К сожалению, у меня нетинтуитивное понимание того, как это работает.Я сделал много торговых приложений и изучил модель актера , посмотрел на SEDA и т.д.однако я не нашел ни одного хорошего описания того, как работают маршрутизаторы.

Есть ли несколько хороших указателей на лучшее объяснение?

Ответы [ 5 ]

207 голосов
/ 03 июля 2011

Проект Google Code ссылается на техническую статью о реализации кольцевого буфера, однако он немного сухой, академичный и трудный для того, кто хочет узнать, как он работает.Однако, есть некоторые сообщения в блоге, которые начали объяснять внутренности более читабельным способом.Существует объяснение кольцевого буфера , который является ядром шаблона прерывателя, описание потребительских барьеров (часть, связанная с чтением из прерывателя) и некоторая информацияпри обработке нескольких производителей доступно.

Самое простое описание Разрушителя: Это способ передачи сообщений между потоками наиболее эффективным способом.Его можно использовать в качестве альтернативы очереди, но он также имеет ряд общих возможностей с SEDA и Actors.

По сравнению с очередями:

Disruptor предоставляетвозможность передавать сообщение другим потокам, будить его при необходимости (по аналогии с BlockingQueue).Однако есть 3 отличия:

  1. Пользователь Disruptor определяет, как хранятся сообщения, расширяя класс Entry и предоставляя фабрику для предварительного выделения.Это позволяет либо повторно использовать (копировать) память, либо запись может содержать ссылку на другой объект.
  2. Помещение сообщений в Disruptor - это двухфазный процесс, сначала в слое кольца запрашивается слот, который обеспечиваетпользователь с записью, которая может быть заполнена соответствующими данными.Затем запись должна быть зафиксирована, этот двухэтапный подход необходим для обеспечения гибкого использования памяти, упомянутой выше.Именно фиксация делает сообщение видимым для потоков потребителя.
  3. Ответственность за отслеживание сообщений, которые были получены из кольцевого буфера, лежит на потребителе.Отказ от этой ответственности от самого кольцевого буфера помог уменьшить количество конфликтов записи, поскольку каждый поток поддерживает свой собственный счетчик.

По сравнению с актерами

АктерМодель ближе к Disruptor, чем большинство других моделей программирования, особенно если вы используете предоставленные классы BatchConsumer / BatchHandler.Эти классы скрывают все сложности обслуживания используемых порядковых номеров и предоставляют набор простых обратных вызовов при возникновении важных событий.Однако есть несколько тонких различий.

  1. В Disruptor используется потребительская модель «1 поток - 1», где актеры используют модель N: M, то есть вы можете иметь столько актеров, сколько захотите, и онибудет распределен по фиксированному количеству потоков (обычно по 1 на ядро).
  2. Интерфейс BatchHandler обеспечивает дополнительный (и очень важный) обратный вызов onEndOfBatch().Это позволяет медленным потребителям, например тем, кто выполняет ввод / вывод, объединять события в пакеты для повышения пропускной способности.Пакетирование можно выполнять в других средах Actor, однако почти во всех других инфраструктурах отсутствует обратный вызов в конце пакета, поэтому для определения конца пакета необходимо использовать тайм-аут, что приводит к низкой задержке.

По сравнению с SEDA

LMAX создал шаблон Disruptor для замены подхода, основанного на SEDA.

  1. Основное улучшение, которое он обеспечил по сравнению сSEDA была способность выполнять работу параллельно.Для этого Disruptor поддерживает многоадресную передачу одинаковых сообщений (в одинаковом порядке) нескольким потребителям.Это устраняет необходимость в этапах разветвления в конвейере.
  2. Мы также позволяем потребителям ждать результатов других потребителей, не устанавливая между ними другую очередь.Потребитель может просто посмотреть порядковый номер потребителя, от которого он зависит.Это устраняет необходимость в этапах соединения в конвейере.

По сравнению с барьерами памяти

Еще один способ думать об этом - это структурированный упорядоченный барьер памяти. Там, где барьер производителя формирует барьер записи, а потребительский барьер - барьер чтения.

135 голосов
/ 16 июля 2011

Сначала мы хотели бы понять модель программирования, которую он предлагает.

Есть один или несколько авторов.Есть один или несколько читателей.Существует строка записей, полностью упорядоченная от старой к новой (изображена слева направо).Авторы могут добавлять новые записи на правом конце.Каждый читатель читает записи последовательно слева направо.Читатели не могут читать прошлых авторов, очевидно.

Нет понятия удаления записи.Я использую «читатель» вместо «потребитель», чтобы избежать изображения потребляемых записей.Однако мы понимаем, что записи слева от последнего читателя становятся бесполезными.

Как правило, читатели могут читать одновременно и независимо.Однако мы можем объявить зависимости среди читателей.Читательские зависимости могут быть произвольным ациклическим графом.Если читатель B зависит от читателя A, читатель B не может читать мимо читателя A.

Зависимость от читателя возникает потому, что читатель A может аннотировать запись, а читатель B зависит от этой аннотации.Например, A выполняет некоторые вычисления для записи и сохраняет результат в поле a в записи.Затем A переходит, и теперь B может прочитать запись, а значение a A сохранено.Если читатель C не зависит от A, C не должен пытаться прочитать a.

Это действительно интересная модель программирования.Независимо от производительности, одна модель может принести пользу многим приложениям.

Конечно, главная цель LMAX - производительность.Он использует предварительно распределенное кольцо записей.Кольцо достаточно большое, но оно ограничено, так что система не будет загружена сверх проектной мощности.Если кольцо заполнено, автор (ы) будет ждать, пока самые медленные читатели не продвинутся и освободят место.

Объекты ввода предварительно выделены и живут вечно, чтобы снизить стоимость сборки мусора.Мы не вставляем новые объекты записей и не удаляем старые объекты записей, вместо этого автор запрашивает уже существующую запись, заполняет ее поля и уведомляет читателей.Это кажущееся двухфазное действие на самом деле является просто атомарным действием

setNewEntry(EntryPopulator);

interface EntryPopulator{ void populate(Entry existingEntry); }

Предварительное выделение записей также означает, что смежные записи (очень вероятно) находятся в соседних ячейках памяти, и поскольку читатели читают записи последовательно, это важно дляиспользовать кэши ЦП.

И много усилий, чтобы избежать блокировки, CAS, даже барьера памяти (например, использовать переменную энергонезависимой последовательности, если есть только один модуль записи)

Для разработчиков читателей: Разноеаннотирующие читатели должны писать в разные поля, чтобы избежать конфликта записи.(На самом деле они должны писать в разные строки кэша.) Аннотирующий читатель не должен касаться чего-либо, что могут прочитать другие независимые читатели.Вот почему я говорю этим читателям аннотировать записей вместо модифицировать записей.

41 голосов
/ 19 июля 2011

Мартин Фаулер написал статью о LMAX и паттерне разрушения Архитектура LMAX , которая может прояснить это далее.

17 голосов
/ 05 июня 2014

Я действительно нашел время, чтобы изучить настоящий источник, из чистого любопытства, и идея этого довольно проста. Самая последняя версия на момент написания этого поста: 3.2.1.

Существует буфер, в котором хранятся предварительно распределенные события, которые будут содержать данные для чтения потребителями.

Буфер поддерживается массивом флагов (целочисленным массивом) его длины, который описывает доступность слотов буфера (подробнее см. Далее). Доступ к массиву осуществляется как java # AtomicIntegerArray, поэтому для целей данного объяснения вы можете также предположить, что он равен единице.

Может быть любое количество производителей. Когда производитель хочет записать в буфер, генерируется длинное число (как при вызове AtomicLong # getAndIncrement, Disruptor фактически использует свою собственную реализацию, но работает аналогичным образом). Давайте назовем это сгенерированное длинным как продюсером. Аналогичным образом, customerCallId генерируется, когда потребитель заканчивает чтение слота из буфера. Доступ к последнему customerCallId.

(Если есть много потребителей, выбирается звонок с самым низким идентификатором.)

Затем эти идентификаторы сравниваются, и, если разница между ними меньше, чем на стороне буфера, производителю разрешается писать.

(Если providerCallId больше, чем недавний customerCallId + bufferSize, это означает, что буфер заполнен, и производитель вынужден ждать шины, пока не станет доступным место.)

Затем производителю назначается слот в буфере на основе его callId (который является prducerCallId по модулю bufferSize, но, поскольку bufferSize всегда имеет степень 2 (ограничение принудительно устанавливается при создании буфера), используемой операциейallall является ManufacturerCallId & ( bufferSize - 1)). После этого можно свободно изменять событие в этом слоте.

(Фактический алгоритм немного сложнее, включающий кэширование недавнего потребителя-идентификатора в отдельном атомарном справочнике для целей оптимизации.)

Когда событие было изменено, изменение «опубликовано». При публикации соответствующего слота в массиве флагов заполняется обновленный флаг. Значение флага - это номер цикла (providerCallId, деленный на bufferSize (опять же, поскольку bufferSize имеет степень 2, фактическая операция - сдвиг вправо).

Аналогичным образом может быть любое количество потребителей. Каждый раз, когда потребитель хочет получить доступ к буферу, генерируется customerCallId (в зависимости от того, как потребители были добавлены в разрушитель, атом, используемый в генерации идентификатора, может быть общим или отдельным для каждого из них). Затем этот customerCallId сравнивается с последним producentCallId, и, если он меньше двух, читателю разрешается прогрессировать.

(Точно так же, если providerCallId равен даже customerCallId, это означает, что буфер пуст, а потребитель вынужден ждать. Способ ожидания определяется WaitStrategy во время создания прерывателя.)

Для отдельных потребителей (тех, у кого есть собственный генератор идентификаторов), следующая проверенная вещь - это возможность пакетного потребления. Слоты в буфере проверяются по порядку, начиная с единицы, соответствующей customerCallId (индекс определяется таким же образом, как и для производителей), до той позиции, которая соответствует недавнему providerCallId.

Они проверяются в цикле путем сравнения значения флага, записанного в массиве флага, со значением флага, созданным для consumerCallId. Если флаги совпадают, это означает, что производители, заполняющие слоты, передали свои изменения. Если нет, цикл прерывается и возвращается самый высокий зафиксированный changeId. Слоты от ConsumerCallId до полученных в changeId можно использовать в пакетном режиме.

Если группа потребителей читает вместе (те, у кого общий генератор идентификаторов), то каждый из них принимает только один callId, и проверяется и возвращается только слот для этого единственного callId.

7 голосов
/ 17 июня 2014

С этой статьи :

Схема разрушения представляет собой очередь дозирования, заархивированную массив (то есть кольцевой буфер), заполненный предварительно выделенной передачей объекты, которые используют барьеры памяти для синхронизации производителей и потребители через последовательности.

Памятные барьеры довольно сложно объяснить, и блог Триши сделал, по моему мнению, лучшую попытку с этим сообщением: http://mechanitis.blogspot.com/2011/08/dissecting-disruptor-why-its-so-fast.html

Но если вы не хотите углубляться в детали низкого уровня, вы можете просто знать, что барьеры памяти в Java реализованы через ключевое слово volatile или через java.util.concurrent.AtomicLong. Последовательности паттерна разрушения равны AtomicLong с и передаются между производителями и потребителями через барьеры памяти вместо блокировок.

Мне проще понять концепцию с помощью кода, поэтому приведенный ниже код представляет собой простой helloworld из CoralQueue , который представляет собой реализацию шаблона разрушения, выполненную CoralBlocks, с которой я аффилированная. В приведенном ниже коде вы можете увидеть, как шаблон прерывателя реализует пакетирование и как кольцевой буфер (то есть круговой массив) обеспечивает бесперебойную связь между двумя потоками:

package com.coralblocks.coralqueue.sample.queue;

import com.coralblocks.coralqueue.AtomicQueue;
import com.coralblocks.coralqueue.Queue;
import com.coralblocks.coralqueue.util.MutableLong;

public class Sample {

    public static void main(String[] args) throws InterruptedException {

        final Queue<MutableLong> queue = new AtomicQueue<MutableLong>(1024, MutableLong.class);

        Thread consumer = new Thread() {

            @Override
            public void run() {

                boolean running = true;

                while(running) {
                    long avail;
                    while((avail = queue.availableToPoll()) == 0); // busy spin
                    for(int i = 0; i < avail; i++) {
                        MutableLong ml = queue.poll();
                        if (ml.get() == -1) {
                            running = false;
                        } else {
                            System.out.println(ml.get());
                        }
                    }
                    queue.donePolling();
                }
            }

        };

        consumer.start();

        MutableLong ml;

        for(int i = 0; i < 10; i++) {
            while((ml = queue.nextToDispatch()) == null); // busy spin
            ml.set(System.nanoTime());
            queue.flush();
        }

        // send a message to stop consumer...
        while((ml = queue.nextToDispatch()) == null); // busy spin
        ml.set(-1);
        queue.flush();

        consumer.join(); // wait for the consumer thread to die...
    }
}
...