Я пытаюсь использовать некоторые вычисления для данных из цепочки биткойнов, которые я установил в flink env.
(Some(List(Map(witness -> 00, sequence -> 4.294967295E9, address -> null, script -> 03a39c030400002330124d696e656420627920425443204775696c6408000418b30b020000, prevout -> Map(hash -> 0000000000000000000000000000000000000000000000000000000000000000, index -> 4.294967295E9)))),Some(236707.0))
Ниже приведен код для того же самого:
class MapFunction extends RichMapFunction[String, (Any)] {
override def map(in: String): (Any) = {
val fieldArray = JSON.parseFull(in)
((findValueForKeyDeep(fieldArray,"inputs")).toString(),
findValueForKeyDeep(fieldArray,"height"))
}
def findValueForKeyDeep[Key](container: Any, key: Key): Option[Any]
= {
@tailrec
def findValueForKeyInTraversable(tr: Traversable[_], key: Key):
Option[Any] = {
if (tr.isEmpty) None
else {
val headRes = findValueForKeyDeep(tr.head, key)
if (headRes.isDefined) headRes else
findValueForKeyInTraversable(tr.tail, key)
}
}
container match {
case m: scala.collection.Map[Key, _] => m.get(key).orElse(findValueForKeyInTraversable(m.values, key))
case o: Option[_] => o.flatMap(findValueForKeyDeep(_, key))
case tr: Traversable[_] => findValueForKeyInTraversable(tr, key)
case _ => None
}
}
}
object psenvsub {
def main(args : Array[String]) {
// Prepare our context and subscriber
val context = ZMQ.context(1)
val subscriber = context.socket(ZMQ.SUB)
val params: ParameterTool = ParameterTool.fromArgs(args)
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
var x = 0
subscriber.connect("tcp://localhost:5563")
subscriber.subscribe("B".getBytes())
while (x>=0) {
x = x + 1
// Read envelope with address
val address = new String(subscriber.recv(0))
// Read message contents
val contents = env.fromElements(new String(subscriber.recv(0)))
// contents.print("print ---------------")
val output_file = "./flinkOut/out"+x+".json"
contents.map(new MapFunction()).writeAsText(output_file)
env.execute("Scala Example zeromq")
}
}
}
Iхотите получить значения из ввода транзакции (значения из вложенного JSON).Ссылка: Scala - извлечение значения из файла, содержащего данные в формате JSON