Я пытаюсь проверить поведение класса, который ест и обрабатывает DataFrames.
Следуя этим предыдущим вопросам: Как писать модульные тесты в Spark 2.0 +? Я пытался использовать шаблон ссуды для запуска своих тестов следующим образом: У меня есть черта поставщика SparkSession:
/**
* This trait allows to use spark in Unit tests
* https://stackoverflow.com/questions/43729262/how-to-write-unit-tests-in-spark-2-0
*/
trait SparkSetup {
def withSparkSession(testMethod: SparkSession => Any) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark test")
val sparkSession = SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
try {
testMethod(sparkSession)
}
// finally sparkSession.stop()
}
}
Который я использую в своем тестовом классе:
class InnerNormalizationStrategySpec
extends WordSpec
with Matchers
with BeforeAndAfterAll
with SparkSetup {
...
"A correct contact message" should {
"be normalized without errors" in withSparkSession{ ss => {
import ss.implicits._
val df = ss.createDataFrame(
ss.sparkContext.parallelize(Seq[Row](Row(validContact))),
StructType(List(StructField("value", StringType, nullable = false))))
val result = target.innerTransform(df)
val collectedResult: Array[NormalizedContactHistoryMessage] = result
.where(result.col("contact").isNotNull)
.as[NormalizedContactHistoryMessage]
.collect()
collectedResult.isEmpty should be(false) // There should be something
collectedResult.length should be(1) // There should be exactly 1 message...
collectedResult.head.contact.isDefined should be(true) // ... of type contact.
}}
}
...
}
При попытке запустить мои тесты с использованием средства IntelliJ все тесты, написанные таким образом, работают (запуск класса Spec сразу), однако, команда sbt test
от терминала делает все тесты неудачными.
Я тоже думал, что это из-за параллелизма, поэтому я добавил
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
в мои настройки sbt, но не работал.
Вот трассировка стека Iполучить: https://pastebin.com/LNTd3KGW
Любая помощь?
Спасибо