MPI: отмена неблокирующей отправки - PullRequest
1 голос
/ 14 декабря 2010

Я использую библиотеку Open MPI для реализации следующего алгоритма: у нас есть два процесса p1 и p2.Они оба выполняют некоторые итерации и в конце каждой итерации сообщают свои результаты.Проблема в том, что выполнение не обязательно сбалансировано, поэтому p1 может выполнить 10 итераций за время, когда p2 выполнит 1. Несмотря на то, что я хочу p2, чтобы прочитать самый последний результат из последней итерации, выполненной p1,

Таким образом, моя идея состоит в том, что p1 отправляет свои результаты на каждой итерации.Но перед отправкой результата из итерации i следует проверить, действительно ли p2 считывает информацию из итерации i-1.В противном случае следует отменить предыдущую отправку , чтобы при чтении p2 из p1 он читал самый последний результат.

К сожалению, я не уверен, каксделай это.Я попытался использовать MPI_Cancel, как в следующем коде:

int main (int argc, char *argv[]){

    int myrank, numprocs;
    MPI_Status status;
    MPI_Request request;

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

    if(myrank == 0){
        int send_buf = 1, flag;
        MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
                  &request);
        MPI_Cancel(&request);
        MPI_Wait(&request, &status);
        MPI_Test_cancelled(&status, &flag);
        if (flag) printf("Send cancelled\n");
        else printf("Send NOT cancelled\n");
        send_buf = 2;
        MPI_Isend(&send_buf, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, 
                  &request);
    }
    else {
        sleep(5);
        int msg;
        MPI_Recv(&msg, 1, MPI_INT, 0, 123,
                 MPI_COMM_WORLD, &status);
        printf("%d\n", msg);
    }
    MPI_Finalize();

    return 0;
}

Но когда я выполняю, он говорит, что отправка не может быть отменена и p2 печатает 1 вместо 2.

Я хотел бы знать, есть ли способ достичь того, что я предлагаю, или есть альтернатива для кодирования поведения между p1 и p2.

Ответы [ 3 ]

5 голосов
/ 14 декабря 2010

Еще одним подходом является использование односторонней связи MPI (например, http://www.linux -mag.com / id / 1793 ). Тем не менее, обратите внимание, что выполнение пассивной связи, что вам действительно нужно здесь, довольно сложно (хотя попарно, с mpi_win_post и mpi_win_start проще) и что односторонние вещи, надеюсь, все изменятся в MPI-3, поэтому я Я не знаю, как далеко по этой дороге я бы посоветовал вам пройти.

Более непосредственное отношение к тому, что вы пытаетесь сделать первым, а не к отмене сообщений (что, как было предложено выше, довольно радикально), вероятно, гораздо проще просто просмотреть все сообщения в очереди (MPI гарантирует, что сообщения не будут перегружены друг друга - единственное предостережение, если вы используете MPI_THREAD_MULTIPLE и имеете несколько потоков, отправляющих в одной задаче MPI, в этом случае порядок определен по частям):

#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>

void compute() {
    const int maxusecs=500;
    unsigned long sleepytime=(unsigned long)round(((float)rand()/RAND_MAX)*maxusecs);

    usleep(sleepytime);
}

int main(int argc, char** argv)
{
  int rank, size, i;
  int otherrank;
  const int niters=10;
  const int tag=5;
  double newval;
  double sentvals[niters+1];
  double othernewval;
  MPI_Request reqs[niters+1];
  MPI_Status stat;
  int ready;

  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);
  if (size != 2) {
     fprintf(stderr,"This assumes 2 processes\n");
     MPI_Finalize();
     exit(-1);
  }

  otherrank = (rank == 0 ? 1 : 0);
  srand(rank);

  compute();
  newval = rank * 100. + 0;
  sentvals[0] = newval;
  MPI_Isend(&(sentvals[0]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
  MPI_Recv (&othernewval,   1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
  for (i=0; i<niters; i++) {

      MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
      while (ready) {
          MPI_Recv(&othernewval, 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &stat);
          printf("%s[%d]: Reading queued data %lf:\n",
                  (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
          MPI_Iprobe(otherrank, tag, MPI_COMM_WORLD, &ready, &stat);
      }

      printf("%s[%d]: Got data %lf, computing:\n", 
              (rank == 0 ? "" : "\t\t\t\t"), rank, othernewval);
      compute();

      /* update my data */ 
      newval = rank * 100. + i + 1;
      printf("%s[%d]: computed %lf, sending:\n", 
              (rank == 0 ? "" : "\t\t\t\t"), rank, newval);
      sentvals[i+1] = newval;
      MPI_Isend(&(sentvals[i+1]), 1, MPI_DOUBLE, otherrank, tag, MPI_COMM_WORLD, &(reqs[0]));
   }


  MPI_Finalize();

  return 0;
}

Запуск этого дает вам (обратите внимание, что только потому, что данные отправляются, не означает, что они получены во время печати):

[0]: Got data 100.000000, computing:
                                [1]: Got data 0.000000, computing:
[0]: computed 1.000000, sending:
[0]: Got data 100.000000, computing:
                                [1]: computed 101.000000, sending:
                                [1]: Got data 0.000000, computing:
[0]: computed 2.000000, sending:
[0]: Got data 100.000000, computing:
                                [1]: computed 102.000000, sending:
                                [1]: Reading queued data 1.000000:
                                [1]: Got data 1.000000, computing:
[0]: computed 3.000000, sending:
[0]: Reading queued data 101.000000:
[0]: Got data 101.000000, computing:
                                [1]: computed 103.000000, sending:
                                [1]: Reading queued data 2.000000:
                                [1]: Got data 2.000000, computing:
[0]: computed 4.000000, sending:
                                [1]: computed 104.000000, sending:
[0]: Reading queued data 102.000000:
                                [1]: Reading queued data 3.000000:
                                [1]: Got data 3.000000, computing:
[0]: Got data 102.000000, computing:
[0]: computed 5.000000, sending:
[0]: Reading queued data 103.000000:
[0]: Got data 103.000000, computing:
                                [1]: computed 105.000000, sending:
                                [1]: Reading queued data 4.000000:
                                [1]: Got data 4.000000, computing:
[0]: computed 6.000000, sending:
[0]: Reading queued data 104.000000:
[0]: Got data 104.000000, computing:
                                [1]: computed 106.000000, sending:
                                [1]: Reading queued data 5.000000:
                                [1]: Got data 5.000000, computing:
[0]: computed 7.000000, sending:
[0]: Reading queued data 105.000000:
[0]: Got data 105.000000, computing:
                                [1]: computed 107.000000, sending:
                                [1]: Reading queued data 6.000000:
                                [1]: Got data 6.000000, computing:
[0]: computed 8.000000, sending:
[0]: Reading queued data 106.000000:
[0]: Got data 106.000000, computing:
                                [1]: computed 108.000000, sending:
                                [1]: Reading queued data 7.000000:
                                [1]: Got data 7.000000, computing:
[0]: computed 9.000000, sending:
[0]: Reading queued data 107.000000:
[0]: Got data 107.000000, computing:
                                [1]: computed 109.000000, sending:
                                [1]: Reading queued data 8.000000:
                                [1]: Got data 8.000000, computing:
[0]: computed 10.000000, sending:
                                [1]: computed 110.000000, sending:

Обратите внимание, что это всего лишь демонстрационный код, окончательная версия на самом деле необходимо сделать в очереди ожидания и больше iprobes, чтобы освободить все ожидающие запросы и сбросить все ожидающие сообщения.

5 голосов
/ 14 декабря 2010

Я бы поменял управление коммуникациями. Вместо p1 отправка ненужных сообщений, которые он должен отменить, p2 должен сигнализировать, что он готов принять сообщение, а p1 отправит только тогда. Тем временем, p1 просто перезаписывает свой буфер отправки с последними результатами.

В (непроверенный) код:

if ( rank == 0 )
{
    int ready;
    MPI_Request p2_request;
    MPI_Status p2_status;
    // initial request
    MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
    for (int i=0; true; i++)
    {
        sleep(1);
        MPI_Test(&p2_request, &ready, &p2_status);
        if ( ready )
        {
            // blocking send: p2 is ready to receive
            MPI_Send(&i, 1, MPI_INT, 1, 123, MPI_COMM_WORLD);
            // post new request
            MPI_Irecv(&ready, 1, MPI_INT, 1, 123, MPI_COMM_WORLD, &p2_request);
        }
    }
}
else
{
    int msg;
    MPI_Status status;
    while (true)
    {
        sleep(5);
        // actual message content doesn't matter, just let p1 know we're ready
        MPI_Send(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD);
        // receive message
        MPI_Recv(&msg, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, &status);
    }
}

Теперь, как я уже сказал, это непроверенный код, но вы, вероятно, видите, что я там получаю. MPI_Cancel следует использовать только тогда, когда дела идут ужасно неправильно: ни одно сообщение не должно быть отменено во время обычного выполнения.

0 голосов
/ 10 января 2011

Поддерживает ли ваша среда и дистрибутив MPI многопоточность?Если это так, вы можете создать поток в P1, который вычисляет значение и сохраняет результат каждой итерации в переменной, совместно используемой с основным потоком P1 (запись защищена семафором). Как предложено выше в suszterpatt, тогда P2 отправит «я готов»сообщение для P1 и P1 отвечает значением из самой последней итерации.

...