Как запустить многопроцессорный питон с распределенным тензорным потоком на слерме - PullRequest
1 голос
/ 14 мая 2019

Я хочу запустить многопроцессорную программу распределенного тензорного потока на slurm. Скрипт должен использовать многопроцессорную библиотеку python для параллельного открытия различных сеансов на разных узлах. Этот подход работает при тестировании с использованием интерактивных сеансов slurm, но, похоже, не работает при использовании заданий sbatch.

Сценарий корректно работает с slurm, если не используется многопроцессорная обработка

Мой bash-скрипт:

#!/bin/bash
#SBATCH -N 2
#SBATCH -n 2 
#SBATCH -c 8
#SBATCH --mem-per-cpu 8000
#SBATCH --exclusive
#SBATCH -t 01:00:00
NPROCS=$(( $SLURM_NNODES * $SLURM_CPUS_PER_TASK ))
export OMP_NUM_THREADS=$NPROCS
export MKL_NUM_THREADS=8
module load TensorFlow/1.8.0-foss-2018a-Python-3.6.4-CUDA-9.2.88
# Execute jobs in parallel
srun -N 1 -n 1  python slurmpythonparallel.py &
srun -N 1 -n 1  python slurmpythonparallel.py 
wait 

Мой скрипт на Python:


def run(worker_hosts,task_index,results, train_x, train_y,val_x,val_y,fold):
    with tf.device("/job:worker/task:%d" % task_index):
        with tf.container("Process%d"%task_index):
            global x
            global y
            global prediction
            global cost
            global optimizer
            global correct
            global accuracy

            x = tf.placeholder('float',[None, 784],name="x")
            y = tf.placeholder('float',name="y_pred")


            prediction=neural_network_model(x)
            cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=prediction,labels=y))
            optimizer = tf.train.AdamOptimizer().minimize(cost)
            correct = tf.equal(tf.argmax(prediction,1),tf.argmax(y,1))
            accuracy = tf.reduce_mean(tf.cast(correct,'float'))
        with tf.Session("grpc://"+worker_hosts[task_index]) as sess:
            run_train(sess, train_x, train_y,task_index,fold)
            results[fold]=sess.run(accuracy, feed_dict={x: val_x, y: val_y})




sys.path.append(os.getcwd())
folds=2
global worker_hosts
start_time = time.time()
cluster, job_name, task_index,worker_hosts = slurm()
cluster_spec = tf.train.ClusterSpec(cluster)
server = tf.train.Server(cluster_spec,job_name=job_name,task_index=task_index)
for i in range(len(worker_hosts)):
    print("Worker Host:",worker_hosts[i])
if task_index != 0:
    server.join()
else:
    multiprocessing.set_start_method('forkserver', force=True)
    results = multiprocessing.Array('d',folds)
    p=[]
    num_of_workers=len(worker_hosts)
    index=0 
    i=0
    for i in range(2):
        p.append(Process(target=run, args=(worker_hosts,index,results,train_x_all,train_y_all,train_x_all,train_y_all,i)))
        i=i+1
        if i%(num_of_workers)==0: # i=2
            for j in range(num_of_workers):
                p[j].daemon = False
                p[j].start()
            for j in range(num_of_workers):
                p[j].join()
                #p[j].exit()




Когда я использую многопроцессорность, я получаю ошибку:

Адрес уже используется

и

Не удалось запустить сервер gRPC

Итак, проблема в том, что я предполагаю, что когда python разветвляет новый процесс, он пытается использовать тот же адрес, который уже назначен родительским процессом или другим запуском скрипта. Я не понимаю, почему эта проблема возникает в slurm, но не при запуске его на моем компьютере или в интерактивных сеансах

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...