Я пытаюсь выполнить какое-то модульное тестирование в приложении Spark Streaming, которое включает DStreams.
Я считаю очень полезным следующее решение: StreamingSuiteBase .Он содержит метод с именем testOperation , в который можно передать входные данные, операцию для проверки и ожидаемый вывод.Он проверит, совпадает ли ожидаемый результат с фактическим.
Проблема, с которой я сталкиваюсь, заключается в том, что во время проверки равенства я получаю действительно один и тот же объект, но упакованный в разные коллекции:
- ожидается: список (myObject)
- актуальный: массив (myObject)
testOperation определяется следующим образом:
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
ordered: Boolean
) (implicit equality: Equality[V]): Unit = {
val numBatches = input.size
withOutputAndStreamingContext(setupStreams[U, V](input, operation)) {
(outputStream, ssc) =>
val output: Seq[Seq[V]] = runStreams[V](
outputStream, ssc, numBatches, expectedOutput.size)
verifyOutput[V](output, expectedOutput, ordered)
}
}
Что непозвольте мне использовать в качестве ожидаемого ввода List(Array(myObject))
Так что мой второй вариант заключался в изменении метода verifyOutput
.Я планировал переопределить его из моего кода, просто добавив несколько строк для генерации списка (Array (myObject)).Вот так ( Обновлено ):
override def verifyOutput[V](output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
ordered: Boolean)
(implicit evidence$1: ClassTag[V], equality: Equality[V]): Unit = {
super.verifyOutput(output, expectedOutput, ordered)
}
//These three lines is what I am planning to add
val sq = expectedOutput(0)
val ssq = sq(0)
val newOutput = Seq(Array(ssq))
logInfo("--------------------------------")
logInfo("output.size = " + output.size)
logInfo("output")
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Match the output with the expected output
assert(output.size === expectedOutput.size, "Number of outputs do not match")
if (ordered) {
for (i <- output.indices)
equalsOrdered(output(i), expectedOutput(i))
} else {
for (i <- output.indices)
equalsUnordered(output(i), expectedOutput(i))
}
logInfo("Output verified successfully")
}
Весь StreamingSuiteBase можно найти здесь
Но я получаю следующую ошибку в Eclipse:
метод verifyOutput ничего не переопределяет.Примечание: суперклассы класса myClass содержат следующие, не финальные члены с именем verifyOutput: def verifyOutput [V] (выход: Seq [Seq [V]], ожидается, что Output: Seq [Seq [V]], order: Boolean) (неявныйдоказательство $ 1: scala.reflect.ClassTag [V], неявное равенство: org.scalactic.Equality [V]): Единица
Это упрощенная версия моего теста:
import org.scalatest.FunSuite
class myClass extends StreamingSuiteBase with FunSuite {
test("ExtCustProfileHbaseAPI") {
//Here I would be generating my input and expected output
val inputData = new myInitialObject()
val expOutput = new myFinalObject()
testOperation(inputData, processTest _, expOutput, ordered = false)
}
def processTest(input: DStream[myInitialObject]): DStream[(String,myFinalObject)] = {
//Operation undertes
val result = operation(input)
result
}
//Here I added the override def verifyOutput[V: ClassTag]...
}
Что я делаю не так?