У меня есть код, который выполняется внутри контейнера докера в кластере kubernetes. Проблема заключается в том, что независимо от установленного предела памяти, иногда, код просто терпит неудачу, то есть задача воздушного потока, выполняющая код, терпит неудачу.
Я попытался проверить использование памяти и процессора. Оба они находятся в пределах, и модуль никогда не перезапускается. Примечание: в моей капсуле работает только один контейнер, и это работник воздушного потока. Следует отметить, что загрузка ЦП достигает «стремящегося к 1» и задача не выполняется.
Так как набор данных преобразован правильно, я не публикую образцы данных. Я ищу эффективность с точки зрения ресурсов.
Мой код:
# Below code is required at the start of each script
from athena.api import get_pandas_df
import pandas as pd
last_correct_constant = 11
def remove_unwanted_cols(df, col_name):
unwanted_cols = []
for _col in df.columns:
if _col.startswith("unnamed"):
if int(_col.split("unnamed__")[-1]) > last_correct_constant:
unwanted_cols.append(_col)
else:
if not _col.startswith(col_name):
unwanted_cols.append(_col)
df = df.drop(columns=unwanted_cols)
return df
def sanitize_column_names(df):
corrected_columns = []
for column in df.columns:
corrected_columns.append(
column
.replace("(", "_")
.replace(")", "_")
.replace(" ", "_")
.replace("/", "_")
.replace(".", "_")
.replace(":", "_")
.lower())
df.columns = corrected_columns
return df
def get_first_row_as_header(df):
df.columns = df.iloc[0]
print("Columns are: ")
print(df.columns)
print("Head is: ")
df = df.iloc[1:]
print(df.head(1))
return df
def remove_cols_for_join(df, col_name):
unwanted_cols = []
for _col in df.columns:
if _col != 'period' and (not _col.startswith(col_name)) and _col != 'Markets':
unwanted_cols.append(_col)
print("Unwanted cols are: ")
print(unwanted_cols)
df = df.drop(columns=unwanted_cols)
return df
def main(*args, **kwargs):
""" Put your main logic here.
Help:
To get pandas dataframe of upstream nodes.
data_frame = get_pandas_df("<upstream_node_name>")
Example: data_frame = get_pandas_df("S3")
Return:
output_data_frame {pandas.DataFrame}
This data frame will be transferred to downstream nodes.
"""
# read dataframes
df = get_pandas_df("CSV")
df = sanitize_column_names(df)
df_sales = df
df_gr_vol = df
df_gr_val = df
print("remove unwanted cols for individual melts")
df = remove_unwanted_cols(df, 'value_offtake_000_rs__')
df_sales = remove_unwanted_cols(df_sales, 'sales_volume__volume_litres__')
df_gr_vol = remove_unwanted_cols(df_gr_vol, 'gr_vol_ya')
df_gr_val = remove_unwanted_cols(df_gr_val, 'gr_val_ya')
df = get_first_row_as_header(df)
df_sales = get_first_row_as_header(df_sales)
df_gr_vol = get_first_row_as_header(df_gr_vol)
df_gr_val = get_first_row_as_header(df_gr_val)
print("melting dataframes")
table_columns = df.columns
df = pd.melt(
df, id_vars=table_columns[:last_correct_constant+1],
value_vars=table_columns[last_correct_constant+1:], var_name='period',
value_name='value_offtake_000_rs__')
df = df[(df["Markets"] != '')]
table_columns = df_sales.columns
df_sales = pd.melt(
df_sales, id_vars=table_columns[:last_correct_constant+1],
value_vars=table_columns[last_correct_constant+1:], var_name='period',
value_name='sales_volume__volume_litres__')
df_sales = df_sales[(df_sales["Markets"] != '')]
df_sales = remove_cols_for_join(df_sales, 'sales_volume__volume_litres__')
table_columns = df_gr_vol.columns
df_gr_vol = pd.melt(
df_gr_vol, id_vars=table_columns[:last_correct_constant+1],
value_vars=table_columns[last_correct_constant+1:], var_name='period',
value_name='gr_vol_ya')
df_gr_vol = df_gr_vol[(df_gr_vol["Markets"] != '')]
df_gr_vol = remove_cols_for_join(df_gr_vol, 'gr_vol_ya')
table_columns = df_gr_val.columns
df_gr_val = pd.melt(
df_gr_val, id_vars=table_columns[:last_correct_constant+1],
value_vars=table_columns[last_correct_constant+1:], var_name='period',
value_name='gr_val_ya')
df_gr_val = df_gr_val[(df_gr_val["Markets"] != '')]
df_gr_val = remove_cols_for_join(df_gr_val, 'gr_val_ya')
print("Before merge: ")
for _col in df.columns :
print(_col)
print("==================")
for _col in df_sales.columns :
print(_col)
df = pd.merge(df, df_sales, on=['Markets', 'period'])
df = pd.merge(df, df_gr_val, on=['Markets', 'period'])
df = pd.merge(df, df_gr_vol, on=['Markets', 'period'])
df = sanitize_column_names(df)
return df
Я ожидаю, что этот код будет эффективно работать с использованием памяти и процессора. Моя текущая память настроена на 32 ГБ и 10CPU ядер.
Данные содержат 14 строк и 637 столбцов, которые я преобразую вышеупомянутым способом.