Улучшите многопоточную индексацию с помощью lucene - PullRequest
13 голосов
/ 16 февраля 2012

Я пытаюсь построить свои индексы в Lucene с несколькими потоками.Итак, я начал свое кодирование и написал следующий код.Сначала я нахожу файлы и для каждого файла создаю поток для его индексации.После этого я присоединяюсь к потокам и оптимизирую индексы.Это работает, но я не уверен ... могу ли я доверять этому в больших масштабах?Есть ли способ улучшить его?

import java.io.File;
import java.io.FileFilter;
import java.io.FileReader;
import java.io.IOException;
import java.io.File;
import java.io.FileReader;
import java.io.BufferedReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Document;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.analysis.StopAnalyzer;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
import org.apache.lucene.index.TermFreqVector;

public class mIndexer extends Thread {

    private File ifile;
    private static IndexWriter writer;

    public mIndexer(File f) {
    ifile = f.getAbsoluteFile();
    }

    public static void main(String args[]) throws Exception {
    System.out.println("here...");

    String indexDir;
        String dataDir;
    if (args.length != 2) {
        dataDir = new String("/home/omid/Ranking/docs/");
        indexDir = new String("/home/omid/Ranking/indexes/");
    }
    else {
        dataDir = args[0];
        indexDir = args[1];
    }

    long start = System.currentTimeMillis();

    Directory dir = FSDirectory.open(new File(indexDir));
    writer = new IndexWriter(dir,
    new StopAnalyzer(Version.LUCENE_34, new File("/home/omid/Desktop/stopwords.txt")),
    true,
    IndexWriter.MaxFieldLength.UNLIMITED);
    int numIndexed = 0;
    try {
        numIndexed = index(dataDir, new TextFilesFilter());
    } finally {
        long end = System.currentTimeMillis();
        System.out.println("Indexing " + numIndexed + " files took " + (end - start) + " milliseconds");
        writer.optimize();
        System.out.println("Optimization took place in " + (System.currentTimeMillis() - end) + " milliseconds");
        writer.close();
    }
    System.out.println("Enjoy your day/night");
    }

    public static int index(String dataDir, FileFilter filter) throws Exception {
    File[] dires = new File(dataDir).listFiles();
    for (File d: dires) {
        if (d.isDirectory()) {
        File[] files = new File(d.getAbsolutePath()).listFiles();
        for (File f: files) {
            if (!f.isDirectory() &&
            !f.isHidden() &&
            f.exists() &&
            f.canRead() &&
            (filter == null || filter.accept(f))) {
                Thread t = new mIndexer(f);
                t.start();
                t.join();
            }
        }
        }
    }
    return writer.numDocs();
    }

    private static class TextFilesFilter implements FileFilter {
    public boolean accept(File path) {
        return path.getName().toLowerCase().endsWith(".txt");
    }
    }

    protected Document getDocument() throws Exception {
    Document doc = new Document();
    if (ifile.exists()) {
        doc.add(new Field("contents", new FileReader(ifile), Field.TermVector.YES));
        doc.add(new Field("path", ifile.getAbsolutePath(), Field.Store.YES, Field.Index.NOT_ANALYZED));
        String cat = "WIR";
        cat = ifile.getAbsolutePath().substring(0, ifile.getAbsolutePath().length()-ifile.getName().length()-1);
        cat = cat.substring(cat.lastIndexOf('/')+1, cat.length());
        //doc.add(new Field("category", cat.subSequence(0, cat.length()), Field.Store.YES));
        //System.out.println(cat.subSequence(0, cat.length()));
    }
    return doc;
    }

    public void run() {
    try {
        System.out.println("Indexing " + ifile.getAbsolutePath());
        Document doc = getDocument();
        writer.addDocument(doc);
    } catch (Exception e) {
        System.out.println(e.toString());
    }

    }
}

Любая гепатит считается.

Ответы [ 2 ]

14 голосов
/ 17 февраля 2012

Если вы хотите распараллелить индексирование, вы можете сделать две вещи:

  • распараллеливание вызовов addDocument,
  • увеличение максимального числа потоков вашего планировщика слияния.

Вы находитесь на правильном пути для распараллеливания вызовов addDocuments, но порождение одного потока на документ не будет масштабироваться, поскольку количество документов, которые нужно проиндексировать, будет расти.Вам лучше использовать фиксированный размер ThreadPoolExecutor .Поскольку эта задача в основном требует интенсивной загрузки ЦП (в зависимости от вашего анализатора и способа извлечения данных), установка числа процессоров вашего компьютера в качестве максимального количества потоков может быть хорошим началом.

ОтносительноВ планировщике слияния вы можете увеличить максимальное количество потоков, которое можно использовать с помощью метода setMaxThreadCount из ConcurrentMergeScheduler .Помните, что при последовательном чтении / записи диски гораздо лучше, чем при случайном чтении / записи, поэтому установка слишком большого максимального числа потоков в планировщике слияния скорее приведет к замедлению индексации, чем к ее ускорению.

Но прежде чем пытаться распараллелить процесс индексирования, вам, вероятно, следует попытаться найти узкое место.Если ваш диск слишком медленный, узким местом, вероятно, будут сбрасывание и шаги слияния, вследствие чего распараллеливание вызовов addDocument (который по сути состоит в анализе документа и буферизации результатов анализа в памяти) не улучшит скорость индексациивообще.

Некоторые примечания:

  • В разрабатываемой версии Lucene ведется некоторая работа по улучшению параллельного индексирования (особенно это касается промывочной части, этой запись в блоге объясняет, как это работает).

  • У Lucene есть хорошая вики-страница на , как повысить скорость индексации , где вы найдете другие способыулучшить скорость индексации.

5 голосов
/ 16 февраля 2012

Я думаю, что более современный способ сделать это - использовать ThreadPoolExecutor и отправить Runnable , который выполняет индексацию. Вы можете дождаться завершения всех потоков, используя .awaitTermination или CountdownLatch.

Я не большой поклонник расширения вашего основного класса Thread, просто создайте работающий внутренний класс, который принимает его зависимости в конструкторе. Это делает ваш код более читабельным, поскольку работа, выполняемая потоками, четко отделена от кода установки вашего приложения.

Несколько замечаний по стилю. Я не большой поклонник того, чтобы ваш основной класс генерировал Exception, обычно это просто означает, что у вас нет четкого представления о различных проверенных случаях исключения, которые может генерировать код, который вы используете. Обычно это неправильно, если у вас нет особых причин.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...