У меня есть pandas df, из которого я делаю командную строку из каждого ряда и запускаю их параллельно, используя os.system()
. Вы заметите, что я жестко запрограммировал мой cores = 64
, потому что в моей системе всего 14 ядер, а в CSV у меня 63 строки, а поскольку ядер всего 14, то запускается только 14 из них, поэтому я передал ядро как 64. Теперь Я должен увеличить количество процессов примерно до 150. Поскольку процессы действительно малы, я хочу переключиться на многопоточность и добиться того же.
В основном я хочу выполнить следующее: -
1.) Чтение строк из файла CSV или любого источника данных в pandas или даже в словарь.
2.) Создание командной строки из всех строк.
3 .) Запуск всех командных строк параллельно с использованием multithreading.
Самый важный вопрос: - Сколько потоков я могу \ использовать за один раз.
def processor(dff):
for index,row in dff.iterrows():
dir = str(row['dir'])
script = str(row['script'])
c = str(row['c'])
o = str(row['o'])
mph5xx = str(row['mph5xx'])
check = str(row['check'])
ser = str(row['ser'])
www = str(row['www'])
influx_host = str(row['influx_host'])
es_host = str(row['es_host'])
xymon = str(row['xymon'])
uri = str(row['uri'])
yellow = str(row['yellow'])
red = str(row['red'])
interval = str(row['interval'])
window = str(row['window'])
anomalous = str(row['anomalous'])
no_page = str(row['no_page'])
nopage_auto = str(row['nopage_auto'])
cmda = dir + '/' + script + ' ' + '-c' + ' ' + c + ' ' + '-o' + ' ' + o + ' ' + '--check' + ' '+ check + ' ' + '--mph5xx' + ' ' + mph5xx + ' ' + '--ser' + ' ' + ser + ' ' + '--www' + ' ' + www + ' ' +'--influx_host' + ' ' + influx_host + ' ' + '--es_host' + ' ' + es_host + ' ' + '--xymon' + ' ' + xymon + ' ' + '--yellow' + ' ' + yellow + ' ' + '--red' + ' ' + red + ' ' + '--interval' + ' ' + interval + ' ' + '--window'+ ' ' + window + ' ' + '--anomalous' + ' ' + anomalous + ' '+ '--no_page' + ' ' + no_page + ' '+ '--nopage_auto' + ' ' + nopage_auto
os.system(cmda)
def main(infile, mdebug):
global debug
debug = mdebug
try:
lines = sum(1 for line in open(infile))
except Exception as err:
print "Error {} opening file: {}".format(err, infile)
sys.exit(2000)
if debug >= 2:
print(infile)
try:
dff = pd.read_csv(infile)
except Exception as err:
print "Error {}, opening file: {}".format(err, infile)
sys.exit(2000)
df_split = np.array_split(dff, (lines+1))
cores = multiprocessing.cpu_count()
cores = 64
if debug >= 1:
print "RCM runs is: {}".format(lines - 1)
print "cores is: {}".format(cores)
if debug >= 5:
sys.exit(2000)
# pool = Pool(cores)
pool = Pool(lines-1)
if debug >= 5:
sys.exit(2000)
for n, frame in enumerate(pool.imap(processor, df_split), start=1):
if frame is not None:
frame.to_csv('{}'.format(n))
pool.close()
pool.join()
if __name__ == "__main__":
args = parse_args()