Чтение HDFS с использованием многопоточности - PullRequest
0 голосов
/ 03 августа 2020

Я читаю файлы из каталога HDFS, используя многопоточность, используя модель «производитель-потребитель», используя BlockingQueue.

Вот мой код;

класс производителя:

public void readURLS() {
    final int capacity = Integer.MAX_VALUE;

    BlockingQueue<String> queue = new LinkedBlockingQueue<>(capacity);
    try {
        FileSystem hdfs = FileSystem.get(hadoopConf);
        FileStatus[] status = hdfs.listStatus(new Path("MYHDFS_PATH"));

        int i = 0;

       for (FileStatus file : status) {
            LOG.info("Thread {} started: ", i++);
            LOG.info("Reading file {} ", file.getPath().getName());
            new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();
       }
    } catch (IOException e) {
        LOG.error("IOException occured while listing files from HDFS directory");
    }

}

FetchData:

 @Override
    public void run() {
        LOG.info("Inside reader to start reading the files ");

        try (BufferedReader bufferedReader =
                new BufferedReader(new InputStreamReader
                        (FileSystem.get(hadoopConf).open(file), StandardCharsets.UTF_8))) {


            String line;
            while ((line = bufferedReader.readLine()) != null) {
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }
                LOG.info("Line is :{}", line);
                queue.put(line);

            }
         
        } catch (IOException e) {
            LOG.error("file : {} ", file.toString());
            throw new IOException(e);
        } catch (InterruptedException e) {
            LOG.error("An error has occurred: ", e);
            Thread.currentThread().interrupt();

        }

При выполнении кода он выдает мне InterruptedIOException:

java.io.IOException: Failed on local exception: java.io.**InterruptedIOException**: Interruped while waiting for IO on channel java.nio.channels.SocketChannel[connected 

Любая идея, почему. Моя идея состоит в том, чтобы l oop по каждому файлу и читать каждый файл, используя отдельный поток.

1 Ответ

1 голос
/ 26 августа 2020

У меня такое же поведение при использовании HDFS из нескольких (многих!) Потоков, и я не знаю ответа на вопрос «почему?», Но сохранение количества потоков, одновременно обращающихся к HDFS, кажется, помогает.

В вашем случае я бы рекомендовал использовать ExecutorService с ограниченным количеством потоков и точно настроить это количество до предела, когда вы не получаете исключений.

Итак, создайте ExecutorService (с 10 потоков в качестве отправной точки):

final ExecutorService executorService = Executors.newFixedThreadPool(10);

и вместо вашего

new Thread(new FetchData(queue, file.getPath(), hadoopConf)).start();

do

executorService.submit(new FetchData(queue, file.getPath(), hadoopConf));

Еще одно улучшение, так как org.apache.hadoop.fs.FileSystem реализует Closeable , вам следует закрыть его. В вашем коде каждый поток создает новый экземпляр FileSystem, но не закрывает его. Поэтому я бы извлек его в переменную внутри вашего try:

try (FileSystem fileSystem = FileSystem.get(hadoopConf);
     BufferedReader bufferedReader =
             new BufferedReader(new InputStreamReader
                     (fileSystem.open(file), StandardCharsets.UTF_8))) {

UPDATE:

Хотя приведенный выше код кажется правильным подходом для объектов Closeable, по умолчанию FileSystem.get вернет кешированные экземпляры из

/** FileSystem cache */
static final Cache CACHE = new Cache();

, и, таким образом, все будет ужасно сломано, когда close() будет вызван на них.

Вы можете отключить кеш файловой системы, установив fs.hdfs.impl.disable.cache config для true, или убедитесь, что экземпляры FileSystem закрываются только после завершения всех рабочих процессов. Также кажется, что вы могли бы просто использовать один экземпляр FileSystem для всех ваших рабочих, хотя я не могу найти никаких подтверждений в javadocs, что это будет работать правильно без дополнительной синхронизации.

...