PySpark Mocking: тест исключений успешен, но исключение не обрабатывается - PullRequest
0 голосов
/ 15 октября 2019

Я использую python 2.7 (не спрашивайте меня, почему, я подрядчик, я просто работаю с тем, что мне дают).

Я пытаюсь реализовать функцию pyspark, которая используетspark-bigquery соединитель для отправки простого запроса с использованием API-источника данных Spark SQL.

Я переживаю самое странное;Я написал функцию и подтвердил, что она действительно работает с сервером, когда я на самом деле запускаю ее. Я хотел убедиться, что если пользователь предоставит имя таблицы, которая не существует, будет сгенерировано исключение в соответствии с обработкой, возвращенной сервером, и я сделал (я знаю, что это не TDD, но просто воспользуйтесь этим). Затем я приступил к написанию теста для него, и мне, очевидно, пришлось сгенерировать фиктивное исключение, которое я сделал следующим образом:

module / query_bq

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession

def submit_bq_query(spark, table, filter_string):
    try:
        df = spark.read.format('bigquery').option('table', table).option('filter', filter_string).load()
        return df
    except Py4JJavaError as e:
        java_error_msg = str(e).split('\n')[1]
        if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:
            raise Exception("RuntimeException: Table {} not found!".format(table))

Как ясказал, это работает как шарм. Теперь тест для него выглядит следующим образом:

module / test_query_bq

import pytest
from mock import patch, mock
from py4j.java_gateway import GatewayProperty, GatewayClient, JavaObject
from py4j.protocol import Py4JJavaError
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType


def mock_p4j_java_error_generator(msg):
    gateway_property = GatewayProperty(auto_field="Mock", pool="Mock")
    client = GatewayClient(gateway_property=gateway_property)
    java_object = JavaObject("RunTimeError", client)
    exception = Py4JJavaError(msg, java_exception=java_object)
    return Exception(exception)


def test_exception_is_thrown_if_table_not_present():

    # Given
    mock_table_name = 'spark_bq_test.false_table_name'
    mock_filter = "word is 'V'"
    mock_errmsg = "Table {} not found".format(mock_table_name)

    # Mocking
    mock_spark = mock.Mock()
    mock_spark_reader = mock.Mock()

    # Mocking return-values setup
    mock_spark.read.format.return_value = mock_spark_reader
    mock_spark_reader.option.return_value = mock_spark_reader
    mock_spark_reader.load.side_effect = mock_p4j_java_error_generator(mock_errmsg)

    # When
    with pytest.raises(Exception) as exception:
        submit_bq_query(mock_spark, mock_table_name, mock_filter)
    assert exception.value.message.errmsg == mock_errmsg

Запуск теста выполнен успешно, , но , когда я пытаюсьотладка, просто чтобы проследить за выполнением, я замечаю, что код сразу после того, как ловится исключение:

module / query_bq

...
    except Py4JJavaError as e:
        java_error_msg = str(e).split('\n')[1] .  # This line is never reached!
        if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:
            raise Exception("RuntimeException: Table {} not found!".format(table))
...

никогда не достигается. Тем не менее, тест, тем не менее, все-таки успешно .

Короче говоря, исключение высмеивается и выдается как следует в тесте. Это также поймано, но это не обработано. Утверждение теста прошло, и тест прошел успешно, как если бы он был обработан, когда это не так, но я никогда не проверяю внутренности ложного исключения. Еще раз позвольте мне заметить, что module / query_bq работает на сервере просто отлично;возвращает dataframes и прекрасно обрабатывает исключения, когда таблицы нет! Суть здесь в тестировании.

Мне нужно сделать дополнительные вещи для обработки части исключения в module / query_bq , но я не могу, потому что я не знаю, чтопроисходит. Кто-нибудь может объяснить?

1 Ответ

0 голосов
/ 21 октября 2019

После 3 дней борьбы я разобрался. Основная проблема заключалась в том, что:

  • Я не правильно издевался над сигнатурой процесса spark.read, а;
  • Я неправильно создавал экземпляр ложного экземпляра Py4JJavaError.

Вот как я это сделал:

... / utils / bigquery_util.py

import logging

from py4j.protocol import Py4JJavaError, Py4JNetworkError


def load_bq_table(spark, table, filter_string):
    tries = 3
    for i in range(tries):
        try:
            logging.info("SQL statement being executed...")
            df = get_df(spark, table, filter_string)
            logging.info("Table-ID: {}, Rows:{} x Cols:{}".format(table, df.count(), len(df.columns)))
            logging.debug("Table-ID: {}, Schema: {}".format(table, df.schema))
            return df
        except Py4JJavaError as e:
            java_exception_str = get_exception_msg(e)
            is_runtime_exception = "java.lang.RuntimeException" in java_exception_str
            table_not_found = ("{} not found".format(table)) in java_exception_str
            if is_runtime_exception and table_not_found:
                logging.error(java_exception_str)
                raise RuntimeError("Table {} not found!".format(table))
        except Py4JNetworkError as ne:
            if i is tries-1:
                java_exception_str = ne.cause
                runtime_error_str = "Error while trying to reach server... {}"
                logging.error(java_exception_str)
                raise EnvironmentError(runtime_error_str.format(java_exception_str))
            continue


def get_exception_msg(e):
    return str(e.java_exception)


def get_df(spark, table, filter_string):
    return (spark.read
            .format('bigquery')
            .option('table', table)
            .option('filter', filter_string)
            .load())

Что касаетсяtesting: ... / test / utils / test_bigquery_util.py

    import pytest
    from mock import patch, mock

    from <...>.utils.bigquery_util import load_bq_table
    from <...>.test.utils.mock_py4jerror import *

    def test_runtime_error_exception_is_thrown_if_table_not_present():

        # Given
        mock_table_name = 'spark_bq_test.false_table_name'
        mock_filter = "word is 'V'"

        # Mocking
        py4j_error_exception = get_mock_py4j_error_exception(get_mock_gateway_client(), mock_target_id="o123")
        mock_errmsg = "java.lang.RuntimeException: Table {} not found".format(mock_table_name)

        # When
        with mock.patch('red_agent.common.utils.bigquery_util.get_exception_msg', return_value=mock_errmsg):
            with mock.patch('red_agent.common.utils.bigquery_util.get_df', side_effect=py4j_error_exception):
                with pytest.raises(RuntimeError):
                    mock_spark = mock.Mock()
                    df = load_bq_table(mock_spark, mock_table_name, mock_filter)

.. и, наконец, для насмешки над Py4JJavaError: ... / test / utils / mock_py4jerror. py

import mock
from py4j.protocol import Py4JJavaError, Py4JNetworkError


def get_mock_gateway_client():
    mock_client = mock.Mock()
    mock_client.send_command.return_value = "0"
    mock_client.converters = []
    mock_client.is_connected.return_value = True
    mock_client.deque = mock.Mock()
    return mock_client


def get_mock_java_object(mock_client, mock_target_id):
    mock_java_object = mock.Mock()
    mock_java_object._target_id = mock_target_id
    mock_java_object._gateway_client = mock_client
    return mock_java_object


def get_mock_py4j_error_exception(mock_client, mock_target_id):
    mock_java_object = get_mock_java_object(mock_client, mock_target_id)
    mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)
    return Py4JJavaError(mock_errmsg, java_exception=mock_java_object)


def get_mock_py4j_network_exception(mock_target_id):
    mock_errmsg = "An error occurred while calling {}.load.".format(mock_target_id)
    return Py4JNetworkError(mock_errmsg)

Надеюсь, это кому-нибудь поможет ...

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...