Мне нужно запустить для l oop, и каждый l oop получит доступ к данным из базы данных, выполнит некоторые манипуляции и запустит алгоритм Дейкстры, затем добавит результаты в окончательный список. Код выглядит следующим образом:
def log_transform(x):
transformed = math.e**(-x)
return transformed
input_region = '1.199'
t1 = '20200101'
t2 = '20200115'
candid_sale_invoices = pd.read_excel('candid_sale_invoices.xlsx')
candid_barcodes = pd.read_excel('candid_barcodes.xlsx')
weights = []
for i in range(int(t1),(int(t2) + 1)):
input_date = str(i)
sql_data = """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as nvarchar(20)) invoiceid
from sales s inner join Product_981115 p on s.productid = p.productid
where s.date = """+ input_date +""" and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """ + input_region
data = []
for chunk in pd.read_sql(sql_data,conn,chunksize = 1000000):
data.append(chunk)
data = pd.concat(data, ignore_index = True)
data = data.merge(candid_sale_invoices)
data = data.merge(candid_barcodes)
final_edges_df = data.iloc[:,[2,3,4]]
final_edges_tuples = [tuple(x) for x in final_edges_df.values]
Gm = ig.Graph.TupleList(final_edges_tuples, directed = True, edge_attrs = ['weight'])
longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None,None, weights = 'weight'))
longest_paths = longest_paths.swifter.apply(log_transform)
longest_paths["Date"] = input_date
longest_paths["RegionID"] = input_region
weights.append(longest_paths)
weights = pd.concat(weights, ignore_index = True)
Проблема заключается в том, что время процесса может дойти до нескольких часов sh. Так как каждая итерация не зависит от других итераций, я решил запустить эту l oop параллельно с помощью этой ссылки.
import psutil
from multiprocess import Pool
pool = Pool(psutil.cpu_count(logical=False))
def graph_analysis(i):
input_date = str(i)
sql_data = """select trim(cast(p.Barcode as nvarchar(20))) Barcode ,cast(s.invoiceid as
nvarchar(20)) invoiceid
from sales s inner join Product_981115 p on s.productid = p.productid
where s.date = """+ input_date +""" and s.qty != 0 and p.sectionid != 1691.199 and s.RegionID = """ + input_region
data = []
for chunk in pd.read_sql(sql_data,conn,chunksize = 1000000):
data.append(chunk)
data = pd.concat(data, ignore_index = True)
data = data.merge(candid_sale_invoices)
data = data.merge(candid_barcodes)
final_edges_df = data.iloc[:,[2,3,4]]
final_edges_tuples = [tuple(x) for x in final_edges_df.values]
Gm = ig.Graph.TupleList(final_edges_tuples, directed = True, edge_attrs = ['weight'])
longest_paths = pd.DataFrame(Gm.shortest_paths_dijkstra(None,None, weights = 'weight'))
longest_paths = longest_paths.swifter.apply(log_transform)
longest_paths["Date"] = input_date
longest_paths["RegionID"] = input_region
Return longest_paths
results = pool.map(graph_analysis,range(int(t1),(int(t2) + 1)))
pool.close()
Сначала я получил ошибку для " input_region "
NameError: имя 'input_region' не определено
, в котором указано, что функция его не распознала. Я знаю, что это глобальная переменная, но не мог понять, как использовать ее внутри моей функции. Поскольку я впервые хотел применить мультиобработку, я решил вручную ввести «input_region» внутри функции и затем изменить его. Однако вторая проблема заключалась в следующем:
Traceback (последний вызов был последним):
File "", строка 66, в results = pool.map ( graph_analysis, range (int (t1), (int (t2) + 1)))
Файл "C: \ Users \ AppData \ Local \ Continuum \ anaconda3 \ lib \ site-packages \ multiprocess \ pool.py ", строка 268, в карте вернуть self._map_asyn c (забавный c, повторяемый, mapstar, размер фрагмента) .get ()
Файл" C: \ Users \ AppData \ Локальный \ Continuum \ anaconda3 \ lib \ site-packages \ multiprocess \ pool.py ", строка 657, в get get self._value
NameError: имя 'pd' не определено
Является ли весь процесс, который я пытаюсь сделать, ложным? Заранее спасибо.