Я использую zeppelin для реализации записных книжек с использованием искрового и углового интерпретатора MongoDB. У меня проблема с синхронизацией между абзацами. Я использую angularWatch для запуска абзацев.
абзац 0 импортируйте необходимый нагрудник и настройте записную книжку для запуска абзаца, когда переменная chootedKey изменится
%spark
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
import com.mongodb.spark.sql._
import org.apache.spark.sql.functions.{asc, _}
import org.bson.Document
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
z.angularBind("Choosedkey", "type")
val selectedKeyontext = z.getInterpreterContext()
z.angularWatch("Choosedkey", (before:Object, after:Object) => {
z.run(List(2).asJava.asInstanceOf[java.util.List[Object]],
selectedKeyontext)
z.run(List(3).asJava.asInstanceOf[java.util.List[Object]],
selectedKeyontext)
z.run(List(4).asJava.asInstanceOf[java.util.List[Object]], selectedKeyontext)
z.run(List(5).asJava.asInstanceOf[java.util.List[Object]], selectedKeyontext)
})
parghraph 1: предоставить текстовую область для записи конфигурации для запросов
println("""%angular <h4> Please type the key</h4>
Your key <input ng-model="gg" type="text"></input> <button class="btn btn-
success" ng-click="Choosedkey=gg" >Run !</button>
""")
paraghraph 2, ответственный за сохранение параметра в коллекции MongoDB для передачи этого параметра в интерпретатор mongodb, поскольку этот интерпретатор в настоящее время не может взаимодействовать с искровым и угловым интерпретатором ...
val s=z.angular("Choosedkey")
var TR =new ListBuffer[String]()
TR+= s.toString
val document = sc.parallelize(
Seq(new Document("faits",TR.asJava)))
val writeConfig = WriteConfig(Map("uri" -> "mongodb://127.0.0.1:27017/",
"database" -> "test","collection" -> "Ckey")
MongoSpark.save(document, writeConfig)
пункт 3: получить параметр, записанный пользователем, и выполнить запрос с этим параметром в коллекциях mongodb
%mongodb
q=String(db.Ckey.find({}, { faits: 1 , _id : 0} )[0].faits)
db.Collection_key.insert(db.ColFTTH.find({"d.key":q}).toArray())
db.Collection_key.aggregate([
{ "$unwind": "$c" },
{ "$group": {
"_id": "$c.re",
"count": { "$sum": 1 }
}},
{ $out: "Key_T" }
db.Collection_key.aggregate([
{ "$unwind": "$in" },
{ "$group": {
"_id": "$in.r",
"count": { "$sum": 1 }
}},
{ $out: "Key_I" }
])
параграф 4: показать таблицу с результатом параграфа 3
%mongodb
db.Key_T.find().sort({count:-1}).limit(10).table()
параграф 15: показать стол из параграфа 3
%mongodb
db.Key_I.find().sort({count:-1}).limit(10).table()
проблема здесь, когда я выбираю ключ и отправляю команду запуска всех параграфов в один и тот же момент, и каждый генерирует ошибки, потому что абзац 3 занял 5 минут, а результаты будут использованы в других, а в третьем параграфе используется неправильный ключ, потому что не дождался исполнения параграфа 2 .. Я запутался, помогите, пожалуйста!