Как эффективно писать панды плавятся и объединяются, чтобы работать внутри контейнеров, не вызывая исключений SystemOOM? - PullRequest
0 голосов
/ 30 апреля 2019

У меня есть код, который выполняется внутри контейнера докера в кластере kubernetes. Проблема заключается в том, что независимо от установленного предела памяти, иногда, код просто терпит неудачу, то есть задача воздушного потока, выполняющая код, терпит неудачу.

Я попытался проверить использование памяти и процессора. Оба они находятся в пределах, и модуль никогда не перезапускается. Примечание: в моей капсуле работает только один контейнер, и это работник воздушного потока. Следует отметить, что загрузка ЦП достигает «стремящегося к 1» и задача не выполняется.

Так как набор данных преобразован правильно, я не публикую образцы данных. Я ищу эффективность с точки зрения ресурсов.

Мой код:

# Below code is required at the start of each script
from athena.api import get_pandas_df
import pandas as pd

last_correct_constant = 11


def remove_unwanted_cols(df, col_name):
    unwanted_cols = []
    for _col in df.columns:
        if _col.startswith("unnamed"):
            if int(_col.split("unnamed__")[-1]) > last_correct_constant:
                unwanted_cols.append(_col)
        else:
            if not _col.startswith(col_name):
                unwanted_cols.append(_col)
    df = df.drop(columns=unwanted_cols)
    return df


def sanitize_column_names(df):
    corrected_columns = []
    for column in df.columns:
        corrected_columns.append(
            column
            .replace("(", "_")
            .replace(")", "_")
            .replace(" ", "_")
            .replace("/", "_")
            .replace(".", "_")
            .replace(":", "_")
            .lower())
    df.columns = corrected_columns
    return df


def get_first_row_as_header(df):
    df.columns = df.iloc[0]
    print("Columns are: ")
    print(df.columns)
    print("Head is: ")
    df = df.iloc[1:]
    print(df.head(1))
    return df


def remove_cols_for_join(df, col_name):
    unwanted_cols = []
    for _col in df.columns:
        if _col != 'period' and (not _col.startswith(col_name)) and _col != 'Markets':
            unwanted_cols.append(_col)
    print("Unwanted cols are: ")
    print(unwanted_cols)
    df = df.drop(columns=unwanted_cols)
    return df


def main(*args, **kwargs):
    """ Put your main logic here.

    Help:
        To get pandas dataframe of upstream nodes.
        data_frame = get_pandas_df("<upstream_node_name>")

        Example: data_frame = get_pandas_df("S3")

    Return:
        output_data_frame {pandas.DataFrame}
        This data frame will be transferred to downstream nodes.
    """
    # read dataframes
    df = get_pandas_df("CSV")
    df = sanitize_column_names(df)
    df_sales = df
    df_gr_vol = df
    df_gr_val = df

    print("remove unwanted cols for individual melts")
    df = remove_unwanted_cols(df, 'value_offtake_000_rs__')
    df_sales = remove_unwanted_cols(df_sales, 'sales_volume__volume_litres__')
    df_gr_vol = remove_unwanted_cols(df_gr_vol, 'gr_vol_ya')
    df_gr_val = remove_unwanted_cols(df_gr_val, 'gr_val_ya')

    df = get_first_row_as_header(df)
    df_sales = get_first_row_as_header(df_sales)
    df_gr_vol = get_first_row_as_header(df_gr_vol)
    df_gr_val = get_first_row_as_header(df_gr_val)

    print("melting dataframes")
    table_columns = df.columns
    df = pd.melt(
        df, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='value_offtake_000_rs__')
    df = df[(df["Markets"] != '')]


    table_columns = df_sales.columns
    df_sales = pd.melt(
        df_sales, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='sales_volume__volume_litres__')
    df_sales = df_sales[(df_sales["Markets"] != '')]
    df_sales = remove_cols_for_join(df_sales, 'sales_volume__volume_litres__')


    table_columns = df_gr_vol.columns
    df_gr_vol = pd.melt(
        df_gr_vol, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='gr_vol_ya')
    df_gr_vol = df_gr_vol[(df_gr_vol["Markets"] != '')]
    df_gr_vol = remove_cols_for_join(df_gr_vol, 'gr_vol_ya')

    table_columns = df_gr_val.columns
    df_gr_val = pd.melt(
        df_gr_val, id_vars=table_columns[:last_correct_constant+1],
        value_vars=table_columns[last_correct_constant+1:], var_name='period',
        value_name='gr_val_ya')
    df_gr_val = df_gr_val[(df_gr_val["Markets"] != '')]
    df_gr_val = remove_cols_for_join(df_gr_val, 'gr_val_ya')

    print("Before merge: ")
    for _col in df.columns :
        print(_col)
    print("==================")
    for _col in df_sales.columns :
        print(_col)

    df = pd.merge(df, df_sales, on=['Markets', 'period'])
    df = pd.merge(df, df_gr_val, on=['Markets', 'period'])
    df = pd.merge(df, df_gr_vol, on=['Markets', 'period'])
    df = sanitize_column_names(df)
    return df

Я ожидаю, что этот код будет эффективно работать с использованием памяти и процессора. Моя текущая память настроена на 32 ГБ и 10CPU ядер.

Данные содержат 14 строк и 637 столбцов, которые я преобразую вышеупомянутым способом.

...