Конвертировать объект Scala из Flink в JSON - PullRequest
0 голосов
/ 14 февраля 2019

Я пытаюсь использовать некоторые вычисления для данных из цепочки биткойнов, которые я установил в flink env.

(Some(List(Map(witness -> 00, sequence -> 4.294967295E9, address -> null, script -> 03a39c030400002330124d696e656420627920425443204775696c6408000418b30b020000, prevout -> Map(hash -> 0000000000000000000000000000000000000000000000000000000000000000, index -> 4.294967295E9)))),Some(236707.0))

Ниже приведен код для того же самого:

class MapFunction extends RichMapFunction[String, (Any)] {
 override def map(in: String): (Any) = {
   val fieldArray = JSON.parseFull(in)

   ((findValueForKeyDeep(fieldArray,"inputs")).toString(), 
    findValueForKeyDeep(fieldArray,"height"))
   }

   def findValueForKeyDeep[Key](container: Any, key: Key): Option[Any] 
    = {
   @tailrec
   def findValueForKeyInTraversable(tr: Traversable[_], key: Key): 
   Option[Any] = {
     if (tr.isEmpty) None
     else {
       val headRes = findValueForKeyDeep(tr.head, key)
       if (headRes.isDefined) headRes else 
        findValueForKeyInTraversable(tr.tail, key)
     }
   }

   container match {
     case m: scala.collection.Map[Key, _] => m.get(key).orElse(findValueForKeyInTraversable(m.values, key))
     case o: Option[_] => o.flatMap(findValueForKeyDeep(_, key))
     case tr: Traversable[_] => findValueForKeyInTraversable(tr, key)
     case _ => None
   }
 }
 }

   object psenvsub {

   def main(args : Array[String]) {

     // Prepare our context and subscriber
      val context = ZMQ.context(1)
      val subscriber = context.socket(ZMQ.SUB)
      val params: ParameterTool = ParameterTool.fromArgs(args)

       // set up execution environment
      val env = ExecutionEnvironment.getExecutionEnvironment

       // make parameters available in the web interface
       env.getConfig.setGlobalJobParameters(params)

     var x = 0
     subscriber.connect("tcp://localhost:5563")
     subscriber.subscribe("B".getBytes())

     while (x>=0) {
       x = x + 1
       // Read envelope with address
       val address = new String(subscriber.recv(0))
       // Read message contents

       val contents = env.fromElements(new String(subscriber.recv(0)))
       // contents.print("print ---------------")

       val output_file = "./flinkOut/out"+x+".json"
       contents.map(new MapFunction()).writeAsText(output_file)      
       env.execute("Scala Example zeromq")
     }
   }
 }

Iхотите получить значения из ввода транзакции (значения из вложенного JSON).Ссылка: Scala - извлечение значения из файла, содержащего данные в формате JSON

...