Я хочу запустить многопроцессорную программу распределенного тензорного потока на slurm. Скрипт должен использовать многопроцессорную библиотеку python для параллельного открытия различных сеансов на разных узлах. Этот подход работает при тестировании с использованием интерактивных сеансов slurm, но, похоже, не работает при использовании заданий sbatch.
Сценарий корректно работает с slurm, если не используется многопроцессорная обработка
Мой bash-скрипт:
#!/bin/bash
#SBATCH -N 2
#SBATCH -n 2
#SBATCH -c 8
#SBATCH --mem-per-cpu 8000
#SBATCH --exclusive
#SBATCH -t 01:00:00
NPROCS=$(( $SLURM_NNODES * $SLURM_CPUS_PER_TASK ))
export OMP_NUM_THREADS=$NPROCS
export MKL_NUM_THREADS=8
module load TensorFlow/1.8.0-foss-2018a-Python-3.6.4-CUDA-9.2.88
# Execute jobs in parallel
srun -N 1 -n 1 python slurmpythonparallel.py &
srun -N 1 -n 1 python slurmpythonparallel.py
wait
Мой скрипт на Python:
def run(worker_hosts,task_index,results, train_x, train_y,val_x,val_y,fold):
with tf.device("/job:worker/task:%d" % task_index):
with tf.container("Process%d"%task_index):
global x
global y
global prediction
global cost
global optimizer
global correct
global accuracy
x = tf.placeholder('float',[None, 784],name="x")
y = tf.placeholder('float',name="y_pred")
prediction=neural_network_model(x)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=prediction,labels=y))
optimizer = tf.train.AdamOptimizer().minimize(cost)
correct = tf.equal(tf.argmax(prediction,1),tf.argmax(y,1))
accuracy = tf.reduce_mean(tf.cast(correct,'float'))
with tf.Session("grpc://"+worker_hosts[task_index]) as sess:
run_train(sess, train_x, train_y,task_index,fold)
results[fold]=sess.run(accuracy, feed_dict={x: val_x, y: val_y})
sys.path.append(os.getcwd())
folds=2
global worker_hosts
start_time = time.time()
cluster, job_name, task_index,worker_hosts = slurm()
cluster_spec = tf.train.ClusterSpec(cluster)
server = tf.train.Server(cluster_spec,job_name=job_name,task_index=task_index)
for i in range(len(worker_hosts)):
print("Worker Host:",worker_hosts[i])
if task_index != 0:
server.join()
else:
multiprocessing.set_start_method('forkserver', force=True)
results = multiprocessing.Array('d',folds)
p=[]
num_of_workers=len(worker_hosts)
index=0
i=0
for i in range(2):
p.append(Process(target=run, args=(worker_hosts,index,results,train_x_all,train_y_all,train_x_all,train_y_all,i)))
i=i+1
if i%(num_of_workers)==0: # i=2
for j in range(num_of_workers):
p[j].daemon = False
p[j].start()
for j in range(num_of_workers):
p[j].join()
#p[j].exit()
Когда я использую многопроцессорность, я получаю ошибку:
Адрес уже используется
и
Не удалось запустить сервер gRPC
Итак, проблема в том, что я предполагаю, что когда python разветвляет новый процесс, он пытается использовать тот же адрес, который уже назначен родительским процессом или другим запуском скрипта. Я не понимаю, почему эта проблема возникает в slurm, но не при запуске его на моем компьютере или в интерактивных сеансах