Java (sdk 2.9.0):
Лучи Считыватели TextIO не дают доступа к самому имени файла, для этих случаев использования нам необходимо использовать FileIO для сопоставления файлов и получения доступа к информации.хранится в имени файла.В отличие от TextIO, чтение файла должно осуществляться пользователем в преобразованиях после чтения FileIO.Результатом чтения FileIO является PCollection. Класс ReadableFile содержит имя файла в качестве метаданных, которое может использоваться вместе с содержимым файла.
FileIO имеет удобный метод readFullyAsUTF8String (), который будет считывать весь файл в объект String, сначала он будет считывать весь файл в память.Если проблема связана с памятью, вы можете напрямую работать с файлом с помощью служебных классов, таких как FileSystems.
От: Ссылка на документ
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(KVs(strings(), strings()))
.via((ReadableFile f) -> KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));
Python (SDK 2.9.0):
Для версии 2.9.0 для python вам потребуется собрать список URI извне конвейера потока данных и передать его в качестве параметра в конвейер.Например, использование FileSystems для чтения в списке файлов с помощью шаблона Glob, а затем передача его в PCollection для обработки.
Как только fileio см. PR https://github.com/apache/beam/pull/7791/, следующий код также будет возможен для Python.
import apache_beam as beam
from apache_beam.io import fileio
with beam.Pipeline() as p:
readable_files = (p
| fileio.MatchFiles(‘hdfs://path/to/*.txt’)
| fileio.ReadMatches()
| beam.Reshuffle())
files_and_contents = (readable_files
| beam.Map(lambda x: (x.metadata.path,
x.read_utf8()))