У меня такое же поведение при использовании HDFS из нескольких (многих!) Потоков, и я не знаю ответа на вопрос «почему?», Но сохранение количества потоков, одновременно обращающихся к HDFS, кажется, помогает.
В вашем случае я бы рекомендовал использовать ExecutorService с ограниченным количеством потоков и точно настроить это количество до предела, когда вы не получаете исключений.
Итак, создайте ExecutorService (с 10 потоков в качестве отправной точки):
final ExecutorService executorService = Executors.newFixedThreadPool(10);
и вместо вашего
new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
do
executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));
Еще одно улучшение, так как org.apache.hadoop.fs.FileSystem
реализует Closeable
, вам следует закрыть его. В вашем коде каждый поток создает новый экземпляр FileSystem
, но не закрывает его. Поэтому я бы извлек его в переменную внутри вашего try
:
try (FileSystem fileSystem = FileSystem.get(hadoopConf);
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader
(fileSystem.open(file), StandardCharsets.UTF_8))) {
UPDATE:
Хотя приведенный выше код кажется правильным подходом для объектов Closeable
, по умолчанию FileSystem.get
вернет кешированные экземпляры из
/** FileSystem cache */
static final Cache CACHE = new Cache();
, и, таким образом, все будет ужасно сломано, когда close()
будет вызван на них.
Вы можете отключить кеш файловой системы, установив fs.hdfs.impl.disable.cache
config для true
, или убедитесь, что экземпляры FileSystem
закрываются только после завершения всех рабочих процессов. Также кажется, что вы могли бы просто использовать один экземпляр FileSystem для всех ваших рабочих, хотя я не могу найти никаких подтверждений в javadocs, что это будет работать правильно без дополнительной синхронизации.