В Java, как я могу создать эквивалент файла контейнера Apache Avro без необходимости использовать файл в качестве носителя? - PullRequest
18 голосов
/ 24 сентября 2011

Это что-то вроде удара в темноте, если кто-нибудь, кто разбирается в Java-реализации Apache Avro, читает это.

Моя цель высокого уровня - найти какой-то способ передачи некоторых серий данных avro.по сети (скажем, HTTP, например, но конкретный протокол не так важен для этой цели).В моем контексте у меня есть HttpServletResponse, в который мне нужно каким-то образом записать эти данные.

Сначала я попытался записать данные как виртуальную версию файла контейнера avro (предположим, что «response» имеет типHttpServletResponse):

response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);

Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();

Все было хорошо и здорово, за исключением того, что оказалось, что Avro действительно не предоставляет способ чтения файла контейнера отдельно от реального файла: DataFileReader имеет только два конструктора:

public DataFileReader(File file, DatumReader<D> reader);

и

public DataFileReader(SeekableInput sin, DatumReader<D> reader);

, где SeekableInput - это настраиваемая форма, специфичная для avro, создание которой также заканчивается чтением из файла.Теперь, учитывая, что, если нет какого-либо способа каким-либо образом принудительно ввести InputStream в файл (/544143/sozdaite-obekt-java-file-ili-ekvivalentnyi-ispolzuya-baitovyi-massiv-v-pamyati-bez-fizicheskogo-faila предполагает, что это не так, и я также попытался просмотреть документацию по Java), этот подход не будет работать, если читатель надругой конец OutputStream получает этот файл контейнера avro (я не уверен, почему они позволили одному выводить файлы двоичного контейнера avro в произвольный OutputStream, не предоставляя способ прочитать их из соответствующего InputStream на другом конце, но это не так.точка).Похоже, что реализация программы чтения контейнерных файлов требует функции «поиска», которую предоставляет конкретный файл.

Хорошо, так что не похоже, что этот подход будет делать то, что я хочу.Как насчет создания ответа JSON, который имитирует файл контейнера avro?

public static Schema WRAPPER_SCHEMA = Schema.parse(
  "{\"type\": \"record\", " +
   "\"name\": \"AvroContainer\", " +
   "\"doc\": \"a JSON avro container file\", " +
   "\"namespace\": \"org.bar.foo\", " +
   "\"fields\": [" +
     "{\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\"}, " +
     "{\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"}]}"
  );

Я не уверен, что это лучший способ приблизиться к этому, учитывая вышеуказанные ограничения, но похоже, что это может сработать,Я помещу схему (например, "Schema someSchema" сверху) как строку в поле "схема", а затем вставлю в серию avro-binary-serialized форму записи, соответствующей этой схеме (т. Е. "GenericRecordsomeRecord ") внутри поля" data ".

Я на самом деле хотел узнать о конкретной детали того, что описано ниже, но я подумал, что было бы также целесообразно дать больший контекст, так что еслиесть лучший высокоуровневый подход, который я мог бы использовать (этот подход работает, но он не кажется оптимальным), пожалуйста, дайте мне знать.

Мой вопрос в том, что я предполагаю, что я использую этот подход на основе JSON,как записать двоичное представление моей записи avro в поле «data» схемы AvroContainer?Например, я встал здесь:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();

GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));

Сначала я попытался опустить предложение ByteBuffer.wrap, но затем строка

datumWriter.write(someRecord, e);

вызвала исключение, которое я не смогприведите массив байтов в ByteBuffer.Справедливо, похоже, что когда класс Encoder (из которых JsonEncoder является подклассом) вызывается для записи объекта Avro Bytes, он требует, чтобы в качестве аргумента был задан ByteBuffer.Таким образом, я попытался инкапсулировать byte [] с помощью java.nio.ByteBuffer.wrap, но когда данные были распечатаны, они были напечатаны в виде прямой серии байтов без прохождения через шестнадцатеричное представление avro:

"data": {"bytes": ".....some gibberish other than the expected format...}

Это не кажется правильным.Согласно документации avro, пример объекта байтов, который они дают, говорит, что мне нужно вставить объект json, пример которого выглядит как "\ u00FF", и то, что я вставил туда, явно не в этом формате.Теперь я хочу узнать следующее:

  • Что является примером формата байтов avro?Это выглядит как "\ uDEADBEEFDEADBEEF ..."?
  • Как мне привести мои двоичные avro-данные (как выводимые BinaryEncoder в массив byte []) в формат, который я могу вставить в объект GenericRecord и правильно ли он печатать в JSON?Например, я хочу Object DATA, для которого я могу вызвать некоторые GenericRecord "someRecord.put (" data ", DATA);"с моими сериализованными данными avro?
  • Как бы я затем считал эти данные обратно в байтовый массив на другом (потребительском) конце, когда ему дается текстовое представление JSON, и он хочет воссоздать GenericRecord, как представленоJSON в формате AvroContainer?
  • (повторяя вопрос ранее) Есть ли лучший способ сделать все это?

Ответы [ 3 ]

2 голосов
/ 20 июня 2012

Как сказал Кнут, если вы хотите использовать что-то, кроме файла, вы можете:

  • используйте SeekableByteArrayInput, как сказал Кнут, для всего, что вы можете вставить в массив байтов
  • Реализуйте SeekablInput по-своему - например, если вы выводили его из странной структуры базы данных.
  • Или просто используйте файл. Почему нет?

Это ваши ответы.

0 голосов
/ 16 апреля 2017

В Java и Scala мы пытались использовать начало с помощью кода, сгенерированного с использованием нитро-кодекса Scala. Начало работы - это то, как библиотека Javascript mtth / avsc решила эту проблему . Однако мы столкнулись с несколькими проблемами сериализации, используя библиотеку Java, в которой были ошибочно введены ошибочные байты в поток байтов, и мы могли не выяснить, откуда эти байты поступают.

Конечно, это означало создание собственной реализации Varint с кодировкой ZigZag. Мех.

Вот оно:

package com.terradatum.query

import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.stage._
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import com.nitro.scalaAvro.runtime.GeneratedMessage
import com.terradatum.diagnostics.AkkaLogging
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
import org.apache.avro.io.EncoderFactory
import org.elasticsearch.search.SearchHit

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/*
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create
* the header. That didn't work for us because somehow erroneous bytes were injected into the output.
*
* Specifically:
* 1. 0x08 prepended to the magic
* 2. 0x0020 between the header and the sync marker
*
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code.
*
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro
* records.
*
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files
*/
object AvroContainerFileHelpers {

  val magic: ByteBuffer = {
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte)
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes)
    mg.position(0)
    mg
  }

  def makeSyncMarker(): Array[Byte] = {
    val digester = MessageDigest.getInstance("MD5")
    digester.update(s"${UUID.randomUUID}@${System.currentTimeMillis()}".getBytes)
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact()
    marker.position(0)
    marker.array()
  }

  /*
  * Note that other implementations of avro container files, such as the javascript library
  * mtth/avsc uses "inception" to encode the header, that is, a datum following a header
  * schema should produce valid headers. We originally had attempted to do the same but for
  * an unknown reason two bytes wore being inserted into our header, one at the very beginning
  * of the header before the MAGIC marker, and one right before the syncmarker of the header.
  * We were unable to determine why this wasn't working, and so this solution was used instead
  * where the record/map is encoded per the avro spec manually without the use of "inception."
  */
  def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = {
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = {
      val mapBytes = map.flatMap {
        case (k, vBuff) =>
          val v = vBuff.array()
          val byteStr = k.getBytes()
          Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v
      }
      Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0)
    }

    val schemaBytes = schema.toString.getBytes
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes)
    schemaBuffer.position(0)
    val metadata = Map("avro.schema" -> schemaBuffer)
    magic.array() ++ avroMap(metadata) ++ syncMarker
  }

  def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = {
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong)
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong)

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    binaryRecords.foreach { rec =>
      buff.append(rec:_*)
    }
    buff.append(syncMarker:_*)

    buff.toArray
  }

  def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = {
    //block(records.map(encodeRecord(schema, _)), syncMarker)
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    records.foreach(record => writer.write(record, binaryEncoder))
    binaryEncoder.flush()
    val flattenedRecords = out.toByteArray
    out.close()

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    val countBytes = Varint.encodeLong(records.length.toLong)
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong)

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    buff.append(flattenedRecords:_*)
    buff.append(syncMarker:_*)

    buff.toArray
  }

  def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
      entity: R
  ): Array[Byte] =
    encodeRecord(entity.companion.schema, entity.toMutable)

  def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = {
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    writer.write(record, binaryEncoder)
    binaryEncoder.flush()
    val bytes = out.toByteArray
    out.close()
    bytes
  }
}

/**
  * Encoding of integers with variable-length encoding.
  *
  * The avro specification uses a variable length encoding for integers and longs.
  * If the most significant bit in a integer or long byte is 0 then it knows that no
  * more bytes are needed, if the most significant bit is 1 then it knows that at least one
  * more byte is needed. In signed ints and longs the most significant bit is traditionally
  * used to represent the sign of the integer or long, but for us it's used to encode whether
  * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that
  * negatives are odd numbers and positives are even numbers:
  *
  * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on
  * while 1,  2,  3 would be encoded as 2, 4, 6, and so on.
  *
  * More information is available in the avro specification here:
  * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt
  *      https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
  */
object Varint {

  import scala.collection.mutable

  def encodeLong(longVal: Long): Array[Byte] = {
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedLong(longVal, buff)
    buff.toArray[Byte]
  }

  def encodeInt(intVal: Int): Array[Byte] = {
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedInt(intVal, buff)
    buff.toArray[Byte]
  }

  def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = {
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
    writeUnsignedLong((x << 1) ^ (x >> 63), dest)
  }

  def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = {
    var x = v
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) {
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    }
    dest += (x & 0x7F).toByte
  }

  def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = {
    writeUnsignedInt((x << 1) ^ (x >> 31), dest)
  }

  def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = {
    var x = v
    while ((x & 0xFFFFF80) != 0L) {
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    }
    dest += (x & 0x7F).toByte
  }
}
0 голосов
/ 08 февраля 2016

Способ, которым я решил это, состоял в том, чтобы отправлять схемы отдельно от данных. Я установил рукопожатие соединения, которое передает схемы вниз с сервера, затем я отправляю закодированные данные туда и обратно. Вы должны создать внешний объект-обертку, как этот:

{'name':'Wrapper','type':'record','fields':[
  {'name':'schemaName','type':'string'},
  {'name':'records','type':{'type':'array','items':'bytes'}}
]}

Где вы сначала кодируете свой массив записей, один за другим, в массив кодированных байтовых массивов. Все в одном массиве должно иметь одинаковую схему. Затем вы кодируете объект-оболочку с помощью приведенной выше схемы - установите «schemaName» в качестве имени схемы, которую вы использовали для кодирования массива.

На сервере вы сначала расшифруете объект-оболочку. После того, как вы декодируете объект-обертку, вы знаете schemaName, и у вас есть массив объектов, которые вы знаете, как декодировать - используйте как хотите!

Обратите внимание, что вы можете обойтись без использования объекта-оболочки, если вы используете протокол, такой как WebSockets, и механизм, такой как Socket.IO (для Node.js) Socket. io предоставляет вам канал связи между браузером и сервером. В этом случае просто используйте конкретную схему для каждого канала, кодируйте каждое сообщение перед его отправкой. Вы все еще должны поделиться схемами, когда соединение инициируется, но если вы используете WebSockets, это легко реализовать. И когда вы закончите, у вас будет произвольное количество строго типизированных двунаправленных потоков между клиентом и сервером.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...