как запустить последовательный режим абзацев, используя watcher и z.run в zeppelin? - PullRequest
0 голосов
/ 14 сентября 2018

Я использую zeppelin для реализации записных книжек с использованием искрового и углового интерпретатора MongoDB. У меня проблема с синхронизацией между абзацами. Я использую angularWatch для запуска абзацев. абзац 0 импортируйте необходимый нагрудник и настройте записную книжку для запуска абзаца, когда переменная chootedKey изменится

    %spark
    import com.mongodb.spark.MongoSpark
    import com.mongodb.spark.config.{ReadConfig, WriteConfig}
    import com.mongodb.spark.sql._
    import org.apache.spark.sql.functions.{asc, _}
    import org.bson.Document
    import scala.collection.JavaConverters._
    import scala.collection.mutable.ListBuffer
    z.angularBind("Choosedkey", "type")
    val selectedKeyontext = z.getInterpreterContext()
    z.angularWatch("Choosedkey", (before:Object, after:Object) => {
    z.run(List(2).asJava.asInstanceOf[java.util.List[Object]], 
    selectedKeyontext)
    z.run(List(3).asJava.asInstanceOf[java.util.List[Object]], 
    selectedKeyontext)
    z.run(List(4).asJava.asInstanceOf[java.util.List[Object]], selectedKeyontext)
    z.run(List(5).asJava.asInstanceOf[java.util.List[Object]], selectedKeyontext)
    })

parghraph 1: предоставить текстовую область для записи конфигурации для запросов

     println("""%angular  <h4> Please type the key</h4>
    Your key <input ng-model="gg" type="text"></input> <button class="btn btn- 
   success" ng-click="Choosedkey=gg" >Run !</button>

     """)

paraghraph 2, ответственный за сохранение параметра в коллекции MongoDB для передачи этого параметра в интерпретатор mongodb, поскольку этот интерпретатор в настоящее время не может взаимодействовать с искровым и угловым интерпретатором ...

   val s=z.angular("Choosedkey")
   var TR =new ListBuffer[String]()
   TR+= s.toString
   val document = sc.parallelize(
   Seq(new Document("faits",TR.asJava)))
   val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1:27017/", 
   "database" -> "test","collection" -> "Ckey")
   MongoSpark.save(document, writeConfig)

пункт 3: получить параметр, записанный пользователем, и выполнить запрос с этим параметром в коллекциях mongodb

    %mongodb
    q=String(db.Ckey.find({}, { faits: 1 , _id : 0} )[0].faits)
    db.Collection_key.insert(db.ColFTTH.find({"d.key":q}).toArray())
    db.Collection_key.aggregate([
    { "$unwind": "$c" },
    { "$group": {
    "_id": "$c.re",
    "count": { "$sum": 1 }
    }},
    { $out: "Key_T" }
    db.Collection_key.aggregate([
    { "$unwind": "$in" },
    { "$group": {
     "_id": "$in.r",
      "count": { "$sum": 1 }
     }},
    { $out: "Key_I" }
    ])

параграф 4: показать таблицу с результатом параграфа 3

   %mongodb
   db.Key_T.find().sort({count:-1}).limit(10).table()

параграф 15: показать стол из параграфа 3

    %mongodb
    db.Key_I.find().sort({count:-1}).limit(10).table()

проблема здесь, когда я выбираю ключ и отправляю команду запуска всех параграфов в один и тот же момент, и каждый генерирует ошибки, потому что абзац 3 занял 5 минут, а результаты будут использованы в других, а в третьем параграфе используется неправильный ключ, потому что не дождался исполнения параграфа 2 .. Я запутался, помогите, пожалуйста!

...