Быстрый разбор JSON с большими пропускаемыми областями - PullRequest
3 голосов
/ 03 июля 2019

У меня есть JSON со следующей структурой (пример из реального мира здесь https://gist.github.com/PavelPenkov/3432fe522e02aa3a8a597020d4ee7361):

{
  "metadata": { /* Huge TYPED object */ },
  "payload": { /* Small flat UNTYPED object */
    "field_1": 1
    "field_2": "Alice"
  }
}

Я хочу извлечь payload часть как можно быстрее, файл огромен и анализирует егов случае с кейсом довольно медленно (5000 операций в секунду на моем ноутбуке). До сих пор я пробовал

  1. Разобрать весь документ в классе кейсов с Джексоном.

  2. Разбор в AST с Джексоном и извлечение только поля payload - немного быстрее.

  3. scala-jsoniter, хотя он, вероятно, может анализировать набранную часть быстрее, он не можетразбирать нетипизированные поля по дизайну.

Есть ли другие варианты, доступные из Java или (предпочтительно) Scala?

1 Ответ

1 голос
/ 22 июля 2019

Пропуск нежелательного значения JSON - это, где светит jsoniter-scala. Да, он не предоставляет модель AST для JSON, но вы можете создать ее для себя или использовать предоставленную какой-либо сторонней библиотекой. Вот пример пользовательского кодека для Circe AST:

package io.circe

import java.util

import com.github.plokhotnyuk.jsoniter_scala.core._
import io.circe.Json._

object CirceJsoniter {
  implicit val codec: JsonValueCodec[Json] = new JsonValueCodec[Json] {
    override def decodeValue(in: JsonReader, default: Json): Json = {
      var b = in.nextToken()
      if (b == 'n') in.readNullOrError(default, "expected `null` value")
      else if (b == '"') {
        in.rollbackToken()
        new JString(in.readString(null))
      } else if (b == 'f' || b == 't') {
        in.rollbackToken()
        if (in.readBoolean()) Json.True
        else Json.False
      } else if ((b >= '0' && b <= '9') || b == '-') {
        new JNumber({
          in.rollbackToken()
          in.setMark() // TODO: add in.readNumberAsString() to Core API of jsoniter-scala
          try {
            do b = in.nextByte()
            while (b >= '0' && b <= '9')
          } catch { case _: JsonReaderException => /* ignore end of input error */} finally in.rollbackToMark()
          if (b == '.' || b == 'e' || b == 'E') new JsonDouble(in.readDouble())
          else new JsonLong(in.readLong())
        })
      } else if (b == '[') {
        new JArray(if (in.isNextToken(']')) Vector.empty
        else {
          in.rollbackToken()
          var x = new Array[Json](4)
          var i = 0
          do {
            if (i == x.length) x = java.util.Arrays.copyOf(x, i << 1)
            x(i) = decodeValue(in, default)
            i += 1
          } while (in.isNextToken(','))
          (if (in.isCurrentToken(']'))
            if (i == x.length) x
            else java.util.Arrays.copyOf(x, i)
          else in.arrayEndOrCommaError()).to[Vector]
        })
      } else if (b == '{') {
        new JObject(if (in.isNextToken('}')) JsonObject.empty
        else {
          val x = new util.LinkedHashMap[String, Json]
          in.rollbackToken()
          do x.put(in.readKeyAsString(), decodeValue(in, default))
          while (in.isNextToken(','))
          if (!in.isCurrentToken('}')) in.objectEndOrCommaError()
          JsonObject.fromLinkedHashMap(x)
        })
      } else in.decodeError("expected JSON value")
    }

    override def encodeValue(x: Json, out: JsonWriter): Unit = x match {
      case JNull => out.writeNull()
      case JString(s) => out.writeVal(s)
      case JBoolean(b) => out.writeVal(b)
      case JNumber(n) => n match {
        case JsonLong(l) => out.writeVal(l)
        case _ => out.writeVal(n.toDouble)
      }
      case JArray(a) =>
        out.writeArrayStart()
        a.foreach(v => encodeValue(v, out))
        out.writeArrayEnd()
      case JObject(o) =>
        out.writeObjectStart()
        o.toIterable.foreach { case (k, v) =>
          out.writeKey(k)
          encodeValue(v, out)
        }
        out.writeObjectEnd()
    }

    override def nullValue: Json = Json.Null
  }
}

Другой вариант, если вам нужно просто извлечь байты значений полезной нагрузки, то вы можете использовать такой код, чтобы сделать это со скоростью ~ 300000 сообщений в секунду для предоставленного образца:

import com.github.plokhotnyuk.jsoniter_scala.core._
import com.github.plokhotnyuk.jsoniter_scala.macros._
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.TimeUnit
import org.openjdk.jmh.annotations._
import scala.reflect.io.Streamable
import scala.util.hashing.MurmurHash3

case class Payload private(bs: Array[Byte]) {
  def this(s: String) = this(s.getBytes(UTF_8))

  override lazy val hashCode: Int = MurmurHash3.arrayHash(bs)

  override def equals(obj: Any): Boolean = obj match {
    case that: Payload => java.util.Arrays.equals(bs, that.bs)
    case _ => false
  }

  override def toString: String = new String(bs, UTF_8)
}

object Payload {
  def apply(s: String) = new Payload(s.getBytes)

  implicit val codec: JsonValueCodec[Payload] = new JsonValueCodec[Payload] {
    override def decodeValue(in: JsonReader, default: Payload): Payload = new Payload(in.readRawValAsBytes())

    override def encodeValue(x: Payload, out: JsonWriter): Unit = out.writeRawVal(x.bs)

    override val nullValue: Payload = new Payload(new Array[Byte](0))
  }
}

case class MessageWithPayload(payload: Payload)

object MessageWithPayload {
  implicit val codec: JsonValueCodec[MessageWithPayload] = JsonCodecMaker.make(CodecMakerConfig())

  val jsonBytes: Array[Byte] = Streamable.bytes(getClass.getResourceAsStream("debezium.json"))
}

@State(Scope.Thread)
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1, jvmArgs = Array(
"-server",
"-Xms2g",
"-Xmx2g",
"-XX:NewSize=1g",
"-XX:MaxNewSize=1g",
"-XX:InitialCodeCacheSize=512m",
"-XX:ReservedCodeCacheSize=512m",
"-XX:+UseParallelGC",
"-XX:-UseBiasedLocking",
"-XX:+AlwaysPreTouch"
))
@BenchmarkMode(Array(Mode.Throughput))
@OutputTimeUnit(TimeUnit.SECONDS)
class ExtractPayloadReading {
  @Benchmark
  def jsoniterScala(): MessageWithPayload = readFromArray[MessageWithPayload](MessageWithPayload.jsonBytes)
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...