Как размер пакета влияет на производительность потокового чтения и записи в Apache Arrow? - PullRequest
0 голосов
/ 21 апреля 2020

Я использую Apache Arrow для передачи данных между Spark (scala) и Tensorflow (python).

В Spark я получил массив [Row] и отправил массив [Строка] через сокет через следующий код:

val rows_array: Array[Row] = df.rdd.collect()
val ss = ServerSocket = new ServerSocket(port)
val iter = rows_array.iterator
// Grouped a iterator into batches, batchIter is a Iterator[Iterator[Row]]
val batchIter = new BatchIterator(iter, arrowRecordBatchSize)
val root = new VectorSchemaRoot(new Schema(fields), vectors, arrowRecordBatchSize)
val out = new DataOutputStream(socket.getOutputStream)
val writer = new ArrowStreamWriter(root, null, out)
writer.start()

sendRecordIterator(root, arrowStreamWriter, batchIter)

sendRecordIterator определяется следующим образом:

  def sendRecordIterator(root: VectorSchemaRoot,
                         writer: ArrowStreamWriter,
                         inputIterator: Iterator[Iterator[Row]]): Unit = { //The Iterator split data to batch
    try {
      val arrowWriter = MyArrowWriter.create(root)
      while (inputIterator.hasNext) {
        val nextBatch = inputIterator.next()

        while (nextBatch.hasNext) {
          arrowWriter.write(nextBatch.next())
        }

        arrowWriter.finish()
        writer.writeBatch()
        arrowWriter.reset()
      }
      writer.end()
    } finally {
      root.close()
    }
  }

В python я получаю ds по следующему коду:

    ds = arrow_io.ArrowStreamDataset(
        [endpoint],
        columns=my_columns,
        output_types=my_types,
        output_shapes=my_shapes,
        batch_size=arrowRecordBatchSize,
        batch_omde='keep_remainder')

Затем я преобразовал ds в numpy и прошел его:

    ds_numpy = tfds.as_numpy(ds)
    my_list = list()
    for x in ds_numpy:
        my_list.append(x)

Я установил другой batch_size (arrowRecordBatchSize) для отправки данных, результаты показывают, что существуют разные коэффициенты эффективности передачи :

arrowRecordBatchSize = 1000 (около 8 КБ) =======> 161 с
arrowRecordBatchSize = 50 000 (около 400 КБ) ======> 63 с
arrowRecordBatchSize = 100 000 (около 800 КБ) =======> 65 с
arrowRecordBatchSize = 1 000 000 (около 8 МБ) =======> 108 с
arrowRecordBatchSize = 10 000 000 (около 80 МБ) ======== > 113 с
arrowRecordBatchSize = 50 000 000 (около 400 МБ) =======> 112 с

Итак, как размер пакета влияет на потоковую передачу Производительность чтения и записи в Apache Arrow? Есть ли лучшие практики для размера партии?

Ps: Я заметил, что отправитель, кажется, использует nio. Это ключ к этому явлению?

...