Потоковый файл из HDFS против копирования его на локальный диск - PullRequest
0 голосов
/ 15 мая 2018

В моем приложении Java я использую текстовый файл (размер ~ 300 МБ), который хранится в HDFS. Каждая строка файла содержит строку и целочисленный идентификатор, разделенные запятой. Я читаю файл построчно и создаю из него Hashmaps (String, ID).

Файл выглядит так:

String1,Integer1
String2,Integer2
...

Теперь я сейчас читаю файл из HDFS напрямую, используя конфигурацию Apacha Hadoop и FileSystem Object.

Configuration conf = new Configuration();
conf.addResource("core-site.xml"));
conf.addResource("hdfs-site.xml"));
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());

path= "<some location in HDFS>"
FileSystem fs = FileSystem.get(URI.create(path), conf);
in = fs.open(new Path(path));

Входной поток «in» передается другой функции с именем read (InputStream in) для чтения файла.

  public void init(InputStream is) throws Exception {
    ConcurrentMap<String, String> pageToId = new ConcurrentHashMap();
    ConcurrentMap<String, String> idToPage = new ConcurrentHashMap();
    logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
    InputStreamReader stream = new InputStreamReader(is, StandardCharsets.UTF_8);
    BufferedReader reader = new BufferedReader(stream);
    List<String> pageIdMappingColumns = ServerProperties.getInstance().getIdMappingColumns();
    String line;
    int line_no=0;

    while (true) {
        try {
            line = reader.readLine();

            if (line == null) {
                break;
            }
            line_no++;
            //System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
            String[] values = line.split(COMMA);
            //System.out.println("Free memory: " + Runtime.getRuntime().freeMemory());
            if (values.length < pageIdMappingColumns.size()) {
                throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);
            }

            String id = EMPTY_STR;
            String page = EMPTY_STR;
            for (int i = 0; i < values.length; i++) {
                String s = values[i].trim();
                if (PAGEID.equals(pageIdMappingColumns.get(i))) {
                    id = s;
                    continue;
                }
                if (PAGENAME.equals(pageIdMappingColumns.get(i))) {
                    page = s;
                }
            }
            pageToId.put(page, id);
            idToPage.put(id, page);
        } catch (Exception e) {
            logger.error(PAGEMAPPER_INIT + e.toString() + " on line " + line_no);

        }
    }
    logger.info("Free memory: " + Runtime.getRuntime().freeMemory());
    logger.info("Total number of lines: " + line_no);
    reader.close();
    ConcurrentMap<String, String> oldPageToId = pageToIdRef.get();
    ConcurrentMap<String, String> oldIdToPage = idToPageRef.get();
    idToPage.put(MINUS_1, START);
    idToPage.put(MINUS_2, EXIT);
    pageToId.put(START, MINUS_1);
    pageToId.put(EXIT, MINUS_2);

    /* Update the Atomic reference hashmaps in memory in two conditions
    1. If there was no map in memory(first iteration)
    2. If the number of page-names and page-id pairs in the mappings.txt file are more than the previous iteration
    */

    if (oldPageToId == null || oldIdToPage != null && oldIdToPage.size() <= idToPage.size() && oldPageToId.size() <= pageToId.size()) {
        idToPageRef.set(idToPage);
        pageToIdRef.set(pageToId);
        logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_UPDATE_MAPPING);
    } else {
        logger.info(PAGEMAPPER_INIT + " " + PAGEMAPPER_LOG_MSZ);
    }
}

Я закрываю поток, когда работа сделана так:

IOUtils.closeQuietly(is);

Я выполняю вышеуказанный код каждые 1 час, так как файл изменяется в HDFS за это время. Итак, теперь я получаю java.lang.OutOfMemoryError: пространство кучи Java.

Мой вопрос: лучше ли скопировать файл на диск и затем использовать его, а не напрямую обращаться к нему из HDFS, если это касается требований к памяти?

Примечание. Файл имеет> 3200000 строк.

1 Ответ

0 голосов
/ 15 мая 2018

Поток - это всегда способ выбора.

Вы получаете OutOfMemory, потому что вы никогда не закрываете свой поток, что приводит к утечке памяти.

Либо вручную закройте свой поток, либо используйте попытку сресурс

Редактировать

pageToId.put(page, id);
idToPage.put(id, page);

Вы сохраняете по крайней мере в два раза больше вашего файла в памяти.Это примерно 600 МБ.

После этого вы присваиваете это значение некоторой ref переменной:

idToPageRef.set(idToPage);
pageToIdRef.set(pageToId);

Я предполагаю, что у вас все еще есть ссылка на старые ref данные где-тоследовательно, данные внутренней карты не освобождаются.

У вас также есть утечка ресурсов на

throw new RuntimeException(PAGEMAPPER_INVALID_MAPPING_FILE_FORMAT);

Вы должны использовать try-with-resource или вручную закрыть свой поток в блоке finally.

...