Я использую python 2.7
(не спрашивайте меня, почему, я подрядчик, я просто работаю с тем, что мне дают).
Я пытаюсь реализовать функцию pyspark
, которая используетspark-bigquery
соединитель для отправки простого запроса с использованием API-источника данных Spark SQL.
Я переживаю самое странное;Я написал функцию и подтвердил, что она действительно работает с сервером, когда я на самом деле запускаю ее. Я хотел убедиться, что если пользователь предоставит имя таблицы, которая не существует, будет сгенерировано исключение в соответствии с обработкой, возвращенной сервером, и я сделал (я знаю, что это не TDD, но просто воспользуйтесь этим). Затем я приступил к написанию теста для него, и мне, очевидно, пришлось сгенерировать фиктивное исключение, которое я сделал следующим образом:
module / query_bq
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
def submit_bq_query(spark, table, filter_string):
try:
df = spark.read.format('bigquery').option('table', table).option('filter', filter_string).load()
return df
except Py4JJavaError as e:
java_error_msg = str(e).split('\n')[1]
if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:
raise Exception("RuntimeException: Table {} not found!".format(table))
Как ясказал, это работает как шарм. Теперь тест для него выглядит следующим образом:
module / test_query_bq
import pytest
from mock import patch, mock
from py4j.java_gateway import GatewayProperty, GatewayClient, JavaObject
from py4j.protocol import Py4JJavaError
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType
def mock_p4j_java_error_generator(msg):
gateway_property = GatewayProperty(auto_field="Mock", pool="Mock")
client = GatewayClient(gateway_property=gateway_property)
java_object = JavaObject("RunTimeError", client)
exception = Py4JJavaError(msg, java_exception=java_object)
return Exception(exception)
def test_exception_is_thrown_if_table_not_present():
# Given
mock_table_name = 'spark_bq_test.false_table_name'
mock_filter = "word is 'V'"
mock_errmsg = "Table {} not found".format(mock_table_name)
# Mocking
mock_spark = mock.Mock()
mock_spark_reader = mock.Mock()
# Mocking return-values setup
mock_spark.read.format.return_value = mock_spark_reader
mock_spark_reader.option.return_value = mock_spark_reader
mock_spark_reader.load.side_effect = mock_p4j_java_error_generator(mock_errmsg)
# When
with pytest.raises(Exception) as exception:
submit_bq_query(mock_spark, mock_table_name, mock_filter)
assert exception.value.message.errmsg == mock_errmsg
Запуск теста выполнен успешно, , но , когда я пытаюсьотладка, просто чтобы проследить за выполнением, я замечаю, что код сразу после того, как ловится исключение:
module / query_bq
...
except Py4JJavaError as e:
java_error_msg = str(e).split('\n')[1] . # This line is never reached!
if "java.lang.RuntimeException" in java_error_msg and ("{} not found".format(table)) in java_error_msg:
raise Exception("RuntimeException: Table {} not found!".format(table))
...
никогда не достигается. Тем не менее, тест, тем не менее, все-таки успешно .
Короче говоря, исключение высмеивается и выдается как следует в тесте. Это также поймано, но это не обработано. Утверждение теста прошло, и тест прошел успешно, как если бы он был обработан, когда это не так, но я никогда не проверяю внутренности ложного исключения. Еще раз позвольте мне заметить, что module / query_bq работает на сервере просто отлично;возвращает dataframes
и прекрасно обрабатывает исключения, когда таблицы нет! Суть здесь в тестировании.
Мне нужно сделать дополнительные вещи для обработки части исключения в module / query_bq , но я не могу, потому что я не знаю, чтопроисходит. Кто-нибудь может объяснить?