Как мое приложение для создания потоковых изображений может передавать данные в графический интерфейс? - PullRequest
2 голосов
/ 10 января 2010

Медленная реализация генератора Мандельброта с множественной точностью.Резьбовые, с использованием потоков POSIX.Gtk GUI.

Я немного потерян.Это моя первая попытка написания многопоточной программы.На самом деле я пока не пытаюсь конвертировать однопотоковую версию, просто пытаюсь реализовать базовую платформу.

Краткое описание того, как она работает:

Main создаетПоток watch_render_start, который ожидает pthread_cond_signal, который сигнализируется обратным вызовом графического интерфейса при нажатии кнопки 'render'.

watch_render_start проверяет, отображает ли изображение уже, проверяет ли оно выход и т. д., но если все идет хорошо, он создаетпоток render_create_threads.

Поток render_create_threads затем создает потоки рендеринга, а затем использует pthread_join, чтобы дождаться их завершения (и делает ли что-то временное с get_time_of_day - это плохо для потоков?).

Точка входа потоков рендеринга (образно), называемая рендерингом, зацикливается, в то время как функция вычисления next_line возвращает значение ИСТИНА, чтобы обработать больше строк.в этом цикле while есть проверки на останов или выход.

Функция next_line получает строку, которую должна вычислить, перед тем, как увеличивать переменную, чтобы указать следующую строку для следующего потока для вычисления.Возвращается, если обрабатываемая строка находится за пределами высоты изображения.Если нет, то он рассчитывает содержимое строки.Затем увеличивает lines_done и проверяет его по высоте изображения и возвращает 0, если> = или 1, если <. </p>

Вот все 470+ строк кода, я уверен, вам будет интересно посмотреть на это.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mpfr.h>
#include <string.h>
#include <gtk/gtk.h>
#include <sys/time.h>

/* build with:

gcc threaded_app.c -o threaded_app -Wall -pedantic -std=gnu99 -lgmp -lmpfr -pthread -D_REENTRANT -ggdb `pkg-config --cflags gtk+-2.0` `pkg-config --libs gtk+-2.0`

*/

typedef struct
{
    struct timeval tv_start;
    struct timeval tv_end;
} Timer;

void timer_start(Timer* t)
{
    gettimeofday(&t->tv_start, 0);
}

void timer_stop(Timer* t)
{
    gettimeofday(&t->tv_end, 0);
}

long timer_get_elapsed(Timer* t)
{
    if (t->tv_start.tv_sec == t->tv_end.tv_sec)
        return t->tv_end.tv_usec - t->tv_start.tv_usec;
    else
        return (t->tv_end.tv_sec - t->tv_start.tv_sec) *
            1e6 + (t->tv_end.tv_usec - t->tv_start.tv_usec);
}

#define NTHREADS 8

#define IMG_WIDTH  480
#define IMG_HEIGHT 360

typedef struct
{
    int rc;
    pthread_t thread;
} rthrds;

typedef struct
{
    int* arr;
    int next_line;
    int lines_done;
    int rendering;
    int start;
    int stop;
    pthread_t rend[NTHREADS];

    int all_quit;

    int width;
    int height;

    double xmin, xmax, ymax;
    int depth;

} image_info;


static gboolean delete_event(GtkWidget *widget,
                             GdkEvent  *event,
                             gpointer   data);
static void destroy(GtkWidget *widget, gpointer data);

void gui_start_render(GtkWidget* widget, gpointer data);
void gui_stop_render(GtkWidget* widget, gpointer data);

static GtkWidget* gui_pbar = NULL;

void *render(void* ptr);
int next_line(image_info* img);

void* watch_render_start(void* ptr);
void* watch_render_stop(void* ptr);
void* watch_render_done(void* ptr);

void* threads_render_create(void* ptr);

pthread_mutex_t next_line_mutex =  PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lines_done_mutex = PTHREAD_MUTEX_INITIALIZER;

pthread_mutex_t img_start_mutex =      PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t img_stop_mutex =       PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t img_rendering_mutex =  PTHREAD_MUTEX_INITIALIZER;

pthread_cond_t img_start_cond =  PTHREAD_COND_INITIALIZER;
pthread_cond_t img_stop_cond =   PTHREAD_COND_INITIALIZER;
pthread_cond_t img_done_cond =   PTHREAD_COND_INITIALIZER;

pthread_mutex_t all_quit_mutex = PTHREAD_MUTEX_INITIALIZER;

int main(int argc, char **argv)
{
    printf("initializing...\n");
    image_info* img = malloc(sizeof(image_info));
    memset(img, 0, sizeof(image_info));

    img->start = 0;

    img->width = IMG_WIDTH;
    img->height = IMG_HEIGHT;

    img->xmin =  -0.75509089265046296296296259;
    img->xmax = -0.75506025752314814814814765;
    img->ymax =  0.050215494791666666666666005;
    img->depth = 30000;

    size_t arr_size = img->width * img->height * sizeof(int);

    printf("creating array size: %ld bytes\n", arr_size);
    img->arr = malloc(arr_size);
    if (!img->arr)
    {
        fprintf(stderr, "image dimension too large!\n");
        free(img);
        exit(-1);
    }
    memset(img->arr, 0, arr_size);

    int rc_err;
    pthread_t thread_start;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    printf("creating watch render start thread...\n");

    rc_err = pthread_create(&thread_start, &attr,
                              &watch_render_start, (void*)img);
    if (rc_err)
    {
        fprintf(stderr, "Thread start creation failed: %d\n",
                        rc_err);
        free(img->arr);
        free(img);
        exit(-1);
    }

    printf("creating GUI...\n");

    GtkWidget *window;
    GtkWidget *startbutton;
    GtkWidget *stopbutton;
    GtkWidget *box1;
    gtk_init (&argc, &argv);
    window = gtk_window_new (GTK_WINDOW_TOPLEVEL);
    g_signal_connect (G_OBJECT (window), "delete_event",
                      G_CALLBACK (delete_event), NULL);
    g_signal_connect (G_OBJECT (window), "destroy",
                      G_CALLBACK (destroy), NULL);
    gtk_container_set_border_width (GTK_CONTAINER (window), 10);

    box1 = gtk_hbox_new(FALSE, 0);
    gtk_container_add(GTK_CONTAINER(window), box1);

    startbutton = gtk_button_new_with_label ("Start render");
    g_signal_connect (G_OBJECT (startbutton), "clicked",
                      G_CALLBACK (gui_start_render), img);
    gtk_box_pack_start(GTK_BOX(box1), startbutton, TRUE, TRUE, 0);
    stopbutton = gtk_button_new_with_label ("Stop render");
    g_signal_connect (G_OBJECT (stopbutton), "clicked",
                      G_CALLBACK (gui_stop_render), img);
    gtk_box_pack_start(GTK_BOX(box1), stopbutton, TRUE, TRUE, 0);

    gui_pbar = gtk_progress_bar_new();
    gtk_progress_bar_set_orientation(GTK_PROGRESS_BAR(gui_pbar),
                                     GTK_PROGRESS_LEFT_TO_RIGHT);
    gtk_progress_bar_set_fraction (GTK_PROGRESS_BAR(gui_pbar), 
                               (gfloat)1.0 ); /* img->real_height); */
    gtk_widget_set_size_request(gui_pbar, 75, 0);
    gtk_box_pack_end(GTK_BOX(box1), gui_pbar, FALSE, FALSE, 0);

    gtk_widget_show(startbutton);
    gtk_widget_show(stopbutton);
    gtk_widget_show(box1);
    gtk_widget_show(window);

    printf("starting GUI\n");

    gtk_main ();

    printf("************************\n"
           "GUI shutdown\n"
           "************************\n");

    printf("setting all_quit\n");

    pthread_mutex_lock(&all_quit_mutex);
    img->all_quit = 1;
    pthread_mutex_unlock(&all_quit_mutex);

    printf("signalling watch render start thread to wakeup...\n");

    pthread_mutex_lock(&img_start_mutex);
    pthread_cond_signal(&img_start_cond);
    pthread_mutex_unlock(&img_start_mutex);

    printf("waiting for watch render start thread to quit...\n");

    pthread_join(thread_start, NULL);

    printf("done\n");

    printf("freeing memory\n");

    free(img->arr);
    free(img);

    printf("goodbye!\n");

    exit(0);
}

void gui_start_render(GtkWidget* widget, gpointer ptr)
{
    image_info* img = (image_info*)ptr;

    printf("************\n"
           "GUI signalling to start render...\n"
           "************\n");

    pthread_mutex_lock(&img_start_mutex);
    img->start = 1;
    pthread_cond_signal(&img_start_cond);
    pthread_mutex_unlock(&img_start_mutex);
}

void gui_stop_render(GtkWidget* widget, gpointer ptr)
{
    image_info* img = (image_info*)ptr;

    printf("************\n"
           "GUI signalling to stop render...\n"
           "************\n");

    pthread_mutex_lock(&img_stop_mutex);
    img->stop = 1;
    pthread_mutex_unlock(&img_stop_mutex);
}

void* watch_render_start(void* ptr)
{
    image_info* img = (image_info*)ptr;

    int rc_err;
    pthread_t render_thread;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    int r;

    int quit = 0;

    for(;;)
    {
        printf("watch_render_start: waiting for img_start_cond\n");
        pthread_mutex_lock(&img_start_mutex);
        if (!img->start)
            pthread_cond_wait(&img_start_cond, &img_start_mutex);
        img->start = 0;
        pthread_mutex_unlock(&img_start_mutex);
        printf("watch_render_start: recieved img_start_cond\n");

        pthread_mutex_lock(&img_rendering_mutex);
        r = img->rendering;
        pthread_mutex_unlock(&img_rendering_mutex);

        printf("checking if we are rendering... ");

        if (r)
        {
            printf("yes\nStopping render...\n");
            pthread_mutex_lock(&img_stop_mutex);
            img->stop = 1;
            pthread_cond_signal(&img_stop_cond);
            pthread_mutex_unlock(&img_stop_mutex);
            pthread_join(render_thread, NULL);
            printf("render stopped\n");
        }
        else
            printf("no\n");

        pthread_mutex_lock(&all_quit_mutex);
        quit = img->all_quit;
        pthread_mutex_unlock(&all_quit_mutex);

        if (quit)
        {
            printf("exiting watch render start thread\n");
            pthread_exit(0);
        }

        printf("creating render thread...\n");
        rc_err = pthread_create(&render_thread, &attr,
                                &threads_render_create, (void*)img);
        if (rc_err)
            pthread_exit(0);
    }
}

void* threads_render_create(void* ptr)
{
    Timer timing_info;

    printf("initializing render thread\n");

    image_info* img = (image_info*)ptr;

    pthread_mutex_lock(&img_rendering_mutex);

    img->rendering = 1;
    pthread_mutex_unlock(&img_rendering_mutex);

    pthread_mutex_lock(&lines_done_mutex);
    img->lines_done = 0;
    pthread_mutex_unlock(&lines_done_mutex);

    pthread_mutex_lock(&img_stop_mutex);
    img->stop = 0;
    pthread_mutex_unlock(&img_stop_mutex);

    pthread_mutex_lock(&next_line_mutex);
    img->next_line = 0;
    pthread_mutex_unlock(&next_line_mutex);

    int rc_err, i;
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    timer_start(&timing_info);

    for (i = 0; i < NTHREADS; ++i)
    {
        printf("creating renderer thread #%d...\n", i);
        rc_err = pthread_create(&img->rend[i], &attr,
                                &render, (void*)img);
        if (rc_err)
        {
            fprintf(stderr, "\nrender thread #%d creation failed: %d\n",
                            i, rc_err);
            return 0;
        }
    }

    for (i = 0; i < NTHREADS; ++i)
    {
        printf("joining renderer thread #%d...\n", i);
        pthread_join(img->rend[i], NULL);
    }

    timer_stop(&timing_info);
    printf("render-time %.3fs\n\n",
            timer_get_elapsed(&timing_info) / (double)1e6);

    printf("all renderer threads finished\n");

    pthread_mutex_lock(&img_stop_mutex);
    img->stop = 0;
    pthread_mutex_unlock(&img_stop_mutex);

    pthread_mutex_lock(&img_rendering_mutex);
    img->rendering = 0;
    pthread_mutex_unlock(&img_rendering_mutex);

    printf("at end of threads_render_create\n");
    pthread_mutex_lock(&lines_done_mutex);
    if (img->lines_done >= img->height)
        printf("image complete\n");
    else
        printf("image interuppted\n");
    pthread_mutex_unlock(&lines_done_mutex);

    pthread_mutex_lock(&img_start_mutex);
    img->start = 0;
    pthread_mutex_unlock(&img_start_mutex);
    printf("exiting render thread\n");
    pthread_exit(NULL);
}

void* render(void* ptr)
{
    image_info* img = (image_info*)ptr;
    int quit = 0;
    printf("starting render..\n");
    while(next_line(img) && !quit)
    {
        pthread_mutex_lock(&img_stop_mutex);
        quit = img->stop;
        pthread_mutex_unlock(&img_stop_mutex);
        pthread_mutex_lock(&all_quit_mutex);
        quit |= img->all_quit;
        pthread_mutex_unlock(&all_quit_mutex);
    }
    printf("exiting render thread\n");
    pthread_exit(0);
}

int next_line(image_info* img)
{
    int line;

    pthread_mutex_lock(&next_line_mutex);
    line = img->next_line++;
    pthread_mutex_unlock(&next_line_mutex);

    if (line >= img->height)
        return 0;

    int ix,wz;
    int img_width = img->width;
    long double x,y,x2,y2,wre=0,wim=0,wre2=0,wim2=0;
    long double xmin = img->xmin, xmax = img->xmax, ymax = img->ymax;
    long double xdiff = xmax - xmin;
    int depth = img->depth;
    long double c_im = 0, c_re = 0;

    y = ymax - (xdiff / (long double)img_width)
                * (long double)line;
    y2 = y * y;

    for (ix = 0; ix < img_width; ++ix)
    {
        x = ((long double)ix / (long double)img_width) * xdiff + xmin;
        x2 = x * x;
        wre = x;
        wim = y;
        wre2 = x2;
        wim2 = y2;
        for (wz = 0; wz < depth; ++wz)
        {
            wim = 2.0 * wre * wim + c_im;
            wre = wre2 - wim2 + c_re;
            wim2 = wim * wim;
            wre2 = wre * wre;
            if (wim2 + wre2 > 4.0F)
                break;
        }
        if (wz == depth + 1)
            wz = 0;
        img->arr[line * img_width + ix] = wz;
    }

    printf("line %d complete\n", line);

    pthread_mutex_lock(&lines_done_mutex);
    img->lines_done++;
    if (img->lines_done == img->height)
    {
        pthread_mutex_unlock(&lines_done_mutex);
        return 0;
    }
    pthread_mutex_unlock(&lines_done_mutex);

    return 1;
}

static gboolean delete_event(GtkWidget *widget,
                             GdkEvent  *event,
                             gpointer   data)
{
   return FALSE;
}

static void destroy(GtkWidget *widget, gpointer data)
{
    gtk_main_quit ();
}

Я дошел до этого и мне нужно несколько указаний о том, как действовать.Для каждой проблемы, с которой я сталкиваюсь, я просто вижу запутанный лабиринт решения, ведущего в тупик!

Сначала я хотел взяться за индикатор выполнения.Графический интерфейс должен будет поставить блокировки на lines_done.Но как узнать, когда это сделать?Как часто это будет выглядеть на lines_done?Я думаю, я мог бы использовать для этого g_idle_add.

Тогда настоящая мясистая проблема - фактически рендеринг данных, которые генерируют все эти счастливые потоки.Как уже говорилось в другом вопросе, у меня будет массив флагов, чтобы указать, какие строки фактически отображаются (потому что они будут отображаться в произвольном порядке из-за природы потоков и планировщиков ОС).Но как GUI проверит это?В том же режиме ожидания обратного вызова, что и индикатор выполнения?И скажем, генерируется большое изображение высотой 8000 пикселей, то есть 8000 мьютексов блокирует и разблокирует каждые столько миллисекунд - это должно стоить правильно?

Так как мне действовать дальше?Эта модель, которую я использую, какой бы она ни была, способна делать то, что я хочу?

Ответы [ 3 ]

0 голосов
/ 10 января 2010

Используйте условную переменную вместе с вашим next_line_mutex.Функция рендеринга в GUI может хранить переменную с последней строкой, которую она отображала, и сравнивать ее с переменной next_line при каждом выполнении условия, чтобы она могла видеть, какие строки нужно отобразить.Функция next_line может вызвать условие.

Как указывалось выше, приведенная выше переменная условия может привести к блокировке графического интерфейса, поэтому это не очень хорошая идея.Вместо этого я полагаю, что графический интерфейс должен проверять переменную lines на интервале времени, возможно, один раз в секунду.

Если выполнение 8000 операций блокировки / разблокировки слишком медленное, я бы рекомендовал делать строки в пакетах3, 5, 7 или даже 8 (для 8 потоков).Если вы назначаете каждому потоку различное количество строк для обработки, и каждая строка занимает примерно одинаковое количество времени обработки, то блокировка с большей вероятностью будет оставаться необработанной, когда она будет взята.Неконтролируемые блокировки очень дешевы, но все же дороже, чем обычные операции с процессором (он должен вытягивать строку кэша из последнего процессора, который его использовал).Это было бы легко сделать, установив next_line в next_lines (img, 8)

0 голосов
/ 10 января 2010

Если у вас есть доступ к атомарному чтению и атомарной записи на вашей платформе (ах), то создайте рабочую таблицу распределения (прочитайте примечания по архитектуре для ваших платформ - может быть, а может и нет, что обычные операции чтения и записи достаточно хороши, вы может или не нужно добавлять барьеры памяти):

Один байт на строку, изначально ноль, ненулевое значение означает поток, которому выделена строка

... и создать атомарно обновленное количество строк, выполненных в поле для рабочего потока. Таблицу следует обновлять и читать с использованием атомарных инструкций чтения / записи (например, кусками по 8, 16, 32 или 64 бита в зависимости от доступных инструкций на платформе).

Логика верхнего уровня должна работать, будь то выполнение всей работы сразу в главном потоке (если изображение действительно маленькое), или запуск одного рабочего потока, или запуск N рабочих потоков.

Координационный поток (или если бы это был я, возможно, я бы отбросил координационный поток и сделал бы это в главном потоке), выделяя потокам в циклическом цикле половину заданий (или всю работу, если их меньше определенной суммы) ). Если он выделяет меньше всей работы, он отслеживает другие потоки и измеряет производительность среднего потока и лучшего потока. Он гарантирует, что потоки в целом не заканчиваются без работы, но старается не оставлять потоки без присмотра.

Внешний интерфейс хранит указатель для каждого работника на то, где в таблице размещения, за которую работал работник, и когда работник увеличивает свое целочисленное поле для количества выполненных строк, внешний интерфейс выполняет поиск вперед через Таблица распределения работ, в которой указываются индексы строк для заданий, выделенных этому рабочему, которые в настоящее время завершены, и обновляется битовый буфер, для которого выполнены определенные строки, а также обновляется поле итого выполненных работ.

-

Это общий алгоритм для динамического распределения работы между потоками, так как другой автор предложил альтернативно статически распределить работу, сделав номера строк рабочим потоком, который должен обработать функцию числа рабочих потоков и рабочего номер потока, а затем просто передайте количество строк, выполненных каждым рабочим через атомарное поле, во внешний интерфейс.

0 голосов
/ 10 января 2010

Чтобы уменьшить количество мьютексов: -

  • Наличие одного мьютекса для доступа к битовому буферу строк, сигнализируемых как выполненные (8000/8 бит = 1000-байтовый буфер).

  • Второй временный битовый буфер.

  • Рабочий поток блокирует мьютекс, устанавливает бит в первом буфере и разблокирует мьютекс.

  • Основной цикл блокирует мьютекс, копирует первый буфер во второй и разблокирует мьютекс.

  • Затем сканирует второй буфер на ненулевое значение и для каждого установленного бита копирует данные для этой строки на выход / экран.

  • Чтобы уменьшить конкуренцию за первый битовый буфер, вы можете разделить первый битовый буфер на 8 или даже 16 сегментов (какой сегмент искать в нас, основываясь на номере строки mod 8 или mod 16) и иметь мьютекс за каждый сегмент.

-

Вероятно, для этого нужно использовать предложенный мною дизайн, но "try_lock" (а не ждать) блокировок, сделать пару NOP и повторить попытку, пока они не станут доступны, а не уступят. Возможно, стоит использовать атомарные inc / dec напрямую, а не мьютексы pthread для более высокой производительности.

Наконец, не стоит иметь 8 потоков, если у вас нет 8 процессоров, и я не знаю насчет get_time_of_day.

Редактировать : Возможно, есть недостаток в том, что я предполагаю, что, если основной поток прерван, пока он заблокировал мьютекс буфера битов, другие потоки тратят кучу времени. Частота этого может быть уменьшена за счет снижения приоритетов других потоков, но я думаю, что лучшая общая стратегия - использовать массив из 8000 типов atomic_t с инструкциями atomic inc / dec, чтобы сигнализировать о завершении строки из рабочих потоков в основной поток. Эти 8000 atomic_t могут быть найдены основным потоком. Я также предположил, что вы уменьшите количество рабочих потоков на единицу меньше, чем количество процессоров.

Редактировать : Восемь потоков кажутся немного произвольными. Откуда вы взяли этот номер? Очевидно, вам нужен хотя бы один рабочий поток.

Редактировать : еще быстрее было бы использовать atomic_set_mask для установки битов в 1000-байтовом буфере, который внешний интерфейс просматривает в цикле.

Редактировать : Предполагая, что на вашей платформе есть atomic_set_mask.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...