Spark Streaming: сколько пакетов обрабатывается после корректного завершения работы - PullRequest
0 голосов
/ 23 февраля 2020

Я успешно реализовал пример кода этого сообщения в блоге от Bartosz Konieczny:

class GracefulShutdownTest extends FlatSpec with Matchers with BeforeAndAfter {

  val NotGracefulStopKey = "notGracefulStop"
  val GracefulStopKey = "gracefulStop"

  val conf = new SparkConf().setAppName("Spark graceful shutdown test").setMaster("local[*]")
  var streamingContext: StreamingContext = null
  var wasStopped = false
  val dataQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()

  before {
    wasStopped = false
    streamingContext = new StreamingContext(conf, Durations.seconds(1))
  }

  after {
    if (!wasStopped) {
      streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    }
  }

  "remaining data" should "be treated when context is stopped gracefully" in {
    consumeData(true, GracefulStopKey)
    // sleep - gives some time to accumulate remaining processed data
    Thread.sleep(6000)

    // all remaining tasks will be processed
    NumbersTracker.collectedData(GracefulStopKey) should contain theSameElementsAs(0 to 49)
  }

  "remaining data" should "not be processed when the context is not stopped gracefully" in {
    consumeData(false, NotGracefulStopKey)
    // sleep - gives some time to accumulate remaining processed data
    Thread.sleep(6000)

    // when the context is not stopped gracefully, remaining data and pending tasks
    // won't be executed, so automatically some of the last RDDs to consume
    // aren't  processed
    NumbersTracker.collectedData(NotGracefulStopKey) should contain noneOf(30, 31, 
      32, 33, 34, 35, 36, 37, 38, 39, 40, 
      41, 42, 43, 44, 45, 46, 47, 48, 49)
  }

  private def consumeData(stopGracefully: Boolean, trackedNumbersKey: String): Unit = {
    val itemsInBatch = 10
    for (index <- 0 until 5) {
      val start = itemsInBatch*index
      val data = start to start+itemsInBatch-1
      dataQueue += streamingContext.sparkContext.parallelize(data)
    }
    streamingContext.queueStream(dataQueue, oneAtATime = true)
      .foreachRDD(rdd => {
        rdd.foreach(number => {
          // after some tests, Spark's accumulator doesn't contain
          // data processed after graceful stop
          NumbersTracker.collectedData(trackedNumbersKey) += number
        })
      })

    new Thread(new Runnable() {
      override def run(): Unit = {
        Thread.sleep(3000)
        val stopSparkContext = true
        streamingContext.stop(stopSparkContext, stopGracefully)
        wasStopped = true
      }
    }).start()

    streamingContext.start()
    streamingContext.awaitTermination()
  }

}

object NumbersTracker {

  val collectedData: mutable.Map[String, mutable.ListBuffer[Int]] =
    mutable.HashMap.empty[String, mutable.ListBuffer[Int]]

  collectedData += ("notGracefulStop" -> mutable.ListBuffer.empty[Int])
  collectedData += ("gracefulStop"  -> mutable.ListBuffer.empty[Int])

}

Однако я не понимаю, почему проходит первый (положительный) тест. Всего в очереди 5 RDD, и длительность пакета установлена ​​на одну секунду, а для oneAtATime установлено значение true. Будучи в состоянии ожидания потока в течение 3 секунд перед остановкой streamingContext, я ожидал бы, что задания Spark обрабатывают только числа до 39, а не 49.

Играя с числами, которые я заметил, что задание обрабатывается точно еще две партии после постепенного отключения. Почему два, а не один (или даже три)?

...