Я использую Apache Arrow для передачи данных между Spark (scala) и Tensorflow (python).
В Spark я получил массив [Row] и отправил массив [Строка] через сокет через следующий код:
val rows_array: Array[Row] = df.rdd.collect()
val ss = ServerSocket = new ServerSocket(port)
val iter = rows_array.iterator
// Grouped a iterator into batches, batchIter is a Iterator[Iterator[Row]]
val batchIter = new BatchIterator(iter, arrowRecordBatchSize)
val root = new VectorSchemaRoot(new Schema(fields), vectors, arrowRecordBatchSize)
val out = new DataOutputStream(socket.getOutputStream)
val writer = new ArrowStreamWriter(root, null, out)
writer.start()
sendRecordIterator(root, arrowStreamWriter, batchIter)
sendRecordIterator определяется следующим образом:
def sendRecordIterator(root: VectorSchemaRoot,
writer: ArrowStreamWriter,
inputIterator: Iterator[Iterator[Row]]): Unit = { //The Iterator split data to batch
try {
val arrowWriter = MyArrowWriter.create(root)
while (inputIterator.hasNext) {
val nextBatch = inputIterator.next()
while (nextBatch.hasNext) {
arrowWriter.write(nextBatch.next())
}
arrowWriter.finish()
writer.writeBatch()
arrowWriter.reset()
}
writer.end()
} finally {
root.close()
}
}
В python я получаю ds по следующему коду:
ds = arrow_io.ArrowStreamDataset(
[endpoint],
columns=my_columns,
output_types=my_types,
output_shapes=my_shapes,
batch_size=arrowRecordBatchSize,
batch_omde='keep_remainder')
Затем я преобразовал ds в numpy и прошел его:
ds_numpy = tfds.as_numpy(ds)
my_list = list()
for x in ds_numpy:
my_list.append(x)
Я установил другой batch_size (arrowRecordBatchSize) для отправки данных, результаты показывают, что существуют разные коэффициенты эффективности передачи :
arrowRecordBatchSize = 1000 (около 8 КБ) =======> 161 с
arrowRecordBatchSize = 50 000 (около 400 КБ) ======> 63 с
arrowRecordBatchSize = 100 000 (около 800 КБ) =======> 65 с
arrowRecordBatchSize = 1 000 000 (около 8 МБ) =======> 108 с
arrowRecordBatchSize = 10 000 000 (около 80 МБ) ======== > 113 с
arrowRecordBatchSize = 50 000 000 (около 400 МБ) =======> 112 с
Итак, как размер пакета влияет на потоковую передачу Производительность чтения и записи в Apache Arrow? Есть ли лучшие практики для размера партии?
Ps: Я заметил, что отправитель, кажется, использует nio. Это ключ к этому явлению?