Я пытаюсь реализовать поиск в ширину в pyopencl.
Итак, работа с одним рабочим элементом проходит гладко, но когда я пытаюсь ускорить его, предоставляя различные точки для поиска для каждого рабочего элементатак что, когда один из них завершает поиск, он может остановить все другие рабочие элементы и сэкономить время.Я не могу найти способ заставить рабочие элементы взаимодействовать друг с другом.
import pyopencl as cl
import numpy as np
import os
TASKS = 0
os.environ['PYOPENCL_COMPILER_OUTPUT'] = '1'
os.environ['PYOPENCL_CTX'] = '1'
# platform, device, context & queue setup
platform = cl.get_platforms()[0]
device = platform.get_devices()[0]
ctx = cl.Context([device])
queue = cl.CommandQueue(ctx)
# graph attributes
keys = np.array([0, 2, 3, 4, 5, 6, 7]) # vertex ids
keys = keys.astype(np.int32)
graph = np.array([
2, 7, -1,
0, 3, -1,
2, 6, -1,
5, 6, 7,
4, 6, 7,
3, 4, -1,
0, 5, -1
])
graph = graph.astype(np.int32)
neighbours = np.array([2, 2, 2, 3, 3, 2, 2]) # of neighbours of each vertex
neighbours = neighbours.astype(np.int32)
visited = np.empty(keys.size) # visited vertices
frontier = np.full(keys.size, -1) # next in line to be traversed with arbitrary source
src = 4
goal = np.array([6]) # vertex to find
vertices = np.array([7]) # number of vertices
# dividing task among neighbours of starting vertex
threads = []
first_neighbours = []
for g in range(0, vertices[0]):
if keys[g] == src:
for j in range(0, neighbours[g]):
threads.append(graph[g*3 + j])
threads = np.array(threads)
threads = threads.astype(np.int32)
break
TASKS = len(threads)
# buffer allocation
mf = cl.mem_flags
graph_buf = cl.Buffer(ctx, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=graph)
keys_buf = cl.Buffer(ctx, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=keys)
neighbours_buf = cl.Buffer(ctx, mf.READ_ONLY | mf.COPY_HOST_PTR, hostbuf=neighbours)
visited_buf = cl.Buffer(ctx, mf.READ_WRITE | mf.COPY_HOST_PTR, hostbuf=visited)
frontier_buf = cl.Buffer(ctx, mf.READ_WRITE | mf.COPY_HOST_PTR, hostbuf=frontier)
goal_buf = cl.Buffer(ctx, mf.READ_ONLY| mf.COPY_HOST_PTR, hostbuf=goal)
vertices_buf = cl.Buffer(ctx, mf.READ_ONLY| mf.COPY_HOST_PTR, hostbuf=vertices)
threads_buf = cl.Buffer(ctx, mf.READ_ONLY| mf.COPY_HOST_PTR, hostbuf=threads)
# kernel program
prg = cl.Program(ctx, '''
__kernel void bfs (__global int graph[], __global int keys[], __global int neighbours[], __global int visited[], __global int frontier[], __global int goal[], __global int vertices[], __global int threads[]) {
int gid = get_global_id(0);
frontier[0] = threads[gid];
int fsize = 1;
int vsize = 0;
while (true) {
// printf("%d Frontier:", gid);
for (int i=0; i<fsize; i++) {
// printf(" %d", frontier[i]);
}
// printf("\\n");
if (fsize == 0) {
printf("Not found %d\\n", goal[0]);
return;
}
int current = *(frontier + 0);
// printf("Current: %d\\n", current);
if (current == goal[0]) {
printf("Found %d\\n", current);
return;
}
for (int fr=0; fr<fsize-1; fr++) {
frontier[fr] = frontier[fr+1];
}
*(frontier + fsize - 1) = -1;
fsize--;
*(visited + vsize) = current;
vsize++;
// printf("Visited:");
for (int i=0; i<vsize; i++) {
// printf(" %d", visited[i]);
}
// printf("\\n");
for (int s=0; s<vertices[0]; s++) {
if (keys[s] == current) {
// printf("Neighbours: %d:", neighbours[s]);
for (int n=0; n<neighbours[s]; n++) {
int nb = graph[s*3 + n];
int already = 0;
// printf(" %d", nb);
for(int v=0; v<vsize; v++) {
if (nb == *(visited+v)) {
already = 1;
break;
}
}
if (already == 1) {
continue;
}
for (int f=0; f<fsize; f++) {
if (nb == *(frontier+f)) {
already = 1;
break;
}
}
if (already == 0) {
*(frontier + fsize) = nb;
fsize++;
}
}
// printf("\\n");
break;
}
}
}
}
''').build()
prg.bfs(queue, (TASKS,), (1,), graph_buf, keys_buf, neighbours_buf, visited_buf, frontier_buf, goal_buf, vertices_buf, threads_buf)
Я знаю, что каждый рабочий элемент проходит весь поиск по сообщению, которое я печатаю, когда поиск успешен.Только один из них должен сделать это.Только если бы я мог найти способ остановить все другие рабочие элементы из одного.Заранее спасибо.