Как оптимизировать цикл приема для тысяч сообщений в Erlang? - PullRequest
5 голосов
/ 29 сентября 2011

В главе «Программирование многоядерных процессоров» книги «Программирование на Erlang» Джо Армстронг приводит хороший пример распараллеливания функции карты:

pmap(F, L) ->
    S = self(),
    %% make_ref() returns a unique reference
    %% we'll match on this later
    Ref = erlang:make_ref(),
    Pids = map(fun(I) ->
        spawn(fun() -> do_f(S, Ref, F, I) end)
    end, L),
    %% gather the results
    gather(Pids, Ref).

do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
    end;

gather([], _) ->
    [].

Это работает хорошо, но я считаю, что в нем есть узкое место, заставляющее его работать очень медленно в списках с более чем 100 000 элементов.

Когда выполняется функция gather(), она начинает сопоставлять первый Pid из списка Pids с сообщением в основном почтовом ящике процесса. Но что, если самое старое сообщение в почтовом ящике не из этого самого Pid? Затем он пробует все остальные сообщения, пока не найдет совпадение. При этом существует определенная вероятность того, что при выполнении функции gather() нам пришлось бы перебирать все сообщения почтового ящика, чтобы найти совпадение с Pid, которое мы взяли из списка Pids. Это N * N худший вариант для списка размером N.

Мне даже удалось доказать существование этого узкого места:

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
        %% Here it is:
        Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
    end;

Как мне избежать этого узкого места?

Ответы [ 3 ]

3 голосов
/ 29 сентября 2011

Проблема в том, что если вы хотите найти правильное решение, вам все равно нужно:

  • проверить, приходит ли данный ответ от одного из ваших процессов породил
  • обеспечить правильный результат заказа

Вот решение, которое использует счетчики вместо списков - это устраняет необходимость обходить входящие несколько раз. Соответствие Ref гарантирует, что сообщения, которые мы получаем от наших детей. Правильный порядок обеспечивается путем сортировки результата с lists:keysort/2 в самом конце pmap, что добавляет некоторые издержки, но, вероятно, оно будет меньше, чем O(n^2).

-module(test).

-compile(export_all).

pmap(F, L) ->
    S = self(),
    % make_ref() returns a unique reference
    % we'll match on this later
    Ref = erlang:make_ref(),
    Count = lists:foldl(fun(I, C) ->
                                spawn(fun() ->
                                              do_f(C, S, Ref, F, I)
                                      end),
                                C+1
                        end, 0, L),
    % gather the results
    Res = gather(0, Count, Ref),
    % reorder the results
    element(2, lists:unzip(lists:keysort(1, Res))).


do_f(C, Parent, Ref, F, I) ->
    Parent ! {C, Ref, (catch F(I))}.


gather(C, C, _) ->
    [];
gather(C, Count, Ref) ->
    receive
        {C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
    end.
2 голосов
/ 03 октября 2011

Пример Джо изящен, но на практике вы хотите более тяжелое решение вашей проблемы.Взгляните, например, на http://code.google.com/p/plists/source/browse/trunk/src/plists.erl.

В общем, вы хотите сделать три вещи:

  1. Выберите рабочую единицу, которая является "достаточно большой".Если рабочая единица слишком мала, вы умираете, обрабатывая накладные расходы.Если он слишком велик, вы умираете из-за бездействия рабочих, особенно если ваша работа не делится поровну на количество элементов в списке.

  2. Верхняя граница числа работающих одновременно.Психогенный предлагает разделить его по расписанию, я предлагаю разделить его по количеству рабочих мест, говорят 100 рабочих мест.То есть вы хотите запустить 100 заданий, а затем дождаться завершения некоторых из них, прежде чем начинать больше заданий.

  3. Если возможно, рассмотрите возможность введения порядка элементов.Это гораздо быстрее, если вам не нужно принимать заказ во внимание.Для многих проблем это возможно.Если порядок имеет значение, тогда используйте dict, чтобы сохранить вещи в соответствии с предложением.Это быстрее для списков с большими элементами.

Основное правило заключается в том, что, как только вы захотите провести параллель, вам редко требуется представление ваших данных на основе списка.Списоку присуща линейность, которая вам не нужна.Гай Стил говорит на эту тему: http://vimeo.com/6624203

1 голос
/ 29 сентября 2011

В этом случае вы можете использовать dict (от pid порожденного процесса до индекса в исходном списке) вместо Pids.

...