Как прочитать двоичный файл avro fileData, с источником в akka? - PullRequest
2 голосов
/ 18 июня 2019

Я пытаюсь прочитать файл avro с Source из akka Streams.

Источник в потоках akka читает данные, такие как FileIO.FromPath (File), которые будут считывать и разделять строки на основе символа (\ n), где, как для avro, как это работает?

Поток:

    object AvroFlow  {
    def apply(jobDate: String): Flow[GenericRecord, GenericRecord, NotUsed] = {
            Flow[GenericRecord].map { rec => rec.put("date", "20190812") rec}       
    }
  }

График:

object AvroRunner {
    def build (src: Source[GenericRecord, NotUsed],
                                     flw: Flow[GenericRecord, GenericRecord, NotUsed],
                                     snk:Flow[GenericRecord, Future[Done])
    : AvroRunner = {
      new AvroRunner(srtc,flw,snk)
    }
  }
class AvroRunner private(src: Source[GenericRecord, NotUsed],
                                     flw: Flow[GenericRecord, GenericRecord, NotUsed],
                                     snk:Flow[GenericRecord, Future[Done]){
  import scala.concurrent.ExecutionContext.Implicits.global
  val GraphRunner = RunnableGraph.fromGraph(GraphDSL.create() {implicit builder =>
    import GraphDSL.Implicits._
    src ~> flw ~> snk
    ClosedShape
  })
}

1 Ответ

0 голосов
/ 19 июня 2019

Самый простой способ создать akka Source объектов данных avro - это не исходный двоичный файл. Скорее, создайте источник из DataFileReader, предоставленного библиотекой avro.

Из документации мы сначала создаем программу чтения файлов из генератора java.io.File:

def createFileReader[T : ClassTag](fileGenerator : () => File) : DataFileReader[T] = 
  new DataFileReader[T](file(), new SpecificDatumReader[T](classTag[T].runtimeClass))

Затем его можно использовать для создания scala Iterator:

def dataFileReaderToIterator[T](dataFileReader : DataFileReader[T]) : Iterator[T] = 
  new Iterator[T] {
    override def hasNext : Boolean = dataFileReader.hasNext

    override def next() : T = dataFileReader.next
  }

Теперь мы можем построить источник потока из генератора файлов:

def fileToAvroSource[T](fileGenerator : () => File) : Source[T, _] = 
  Source.fromIterator[T](() => dataFileReaderToIterator[T](createFileReader(fileGenerator)))

противодавление

Похоже, что avro использует стандартные методы BufferedReader / OutputStream для чтения File. Следовательно, вышеприведенная реализация должна обеспечивать обратное давление вплоть до источника файла. Однако я не подтвердил, что это так ...

...