Алгоритм N-way слияния - PullRequest
       33

Алгоритм N-way слияния

76 голосов
/ 20 февраля 2011

Двустороннее слияние широко изучается как часть алгоритма слияния. Но мне интересно узнать, как лучше всего выполнить N-way слияние?

Допустим, у меня есть N файлы, каждый из которых отсортирован по 1 миллиону целых чисел. Я должен объединить их в один файл, который будет иметь эти 100 миллионов отсортированных целых чисел.

Пожалуйста, имейте в виду, что сценарий использования для этой проблемы - фактически внешняя сортировка, которая основана на диске. Следовательно, в реальных сценариях также может быть ограничение памяти. Поэтому наивный подход к объединению 2 файлов за раз (99 раз) не сработает. Допустим, у нас есть только небольшое скользящее окно памяти, доступное для каждого массива.

Я не уверен, существует ли уже стандартизированное решение для этого N-way слияния. (Гугл мне мало что сказал) .

Но если вы знаете, хороший ли алгоритм n-way merge, пожалуйста, напишите algo / link.

Сложность времени: Если мы значительно увеличим количество файлов (N) для объединения, как это повлияет на временную сложность вашего алгоритма?

Спасибо за ваши ответы.

Меня нигде не спрашивали, но я чувствовал, что это может быть интересный вопрос для интервью. Поэтому помечены.

Ответы [ 8 ]

78 голосов
/ 20 февраля 2011

Как насчет следующей идеи:

  1. Создать приоритетную очередь

  2. Итерация по каждому файлу f
    1. поставить в очередь пару (nextNumberIn (f), f) , используя первое значение в качестве ключа приоритета

  3. Пока очередь не пуста
    1. головка очереди (м, ж) очереди
    2. выход м
    3. , если f не исчерпан
      1. в очереди (nextNumberIn (f), f)

Поскольку добавление элементов в приоритетную очередь может быть выполнено за логарифмическое время, элемент 2 имеет значение O (N & times; log N) . Поскольку (почти все) итерации цикла while добавляет элемент, весь цикл while равен O (M & times; log N) , где M - общее количество чисел для сортировки .

Предполагая, что все файлы имеют непустую последовательность чисел, мы имеем M> N и, следовательно, весь алгоритм должен быть O (M & times; log N) .

12 голосов
/ 20 февраля 2011

Ищите «Многофазное слияние», посмотрите классику - Donald Knuth & E.H.Friend.

Также вы можете взглянуть на предлагаемое Умное объединение блоков от Seyedafsari & Hasanzadeh, которое, как и в предыдущих предложениях, использует очереди с приоритетами.

Еще одним интересным аргументом является Алгоритм слияния на месте от Kim & Kutzner.

Я также рекомендую эту статью Vitter: Алгоритмы внешней памяти и структуры данных: работа с массивными данными .

6 голосов
/ 20 февраля 2011

Одной простой идеей является сохранение приоритетной очереди объединяемых диапазонов, сохраняемой таким образом, чтобы диапазон с наименьшим первым элементом удалялся первым из очереди.Затем вы можете выполнить N-way слияние следующим образом:

  1. Вставьте все диапазоны в приоритетную очередь, за исключением пустых диапазонов.
  2. Пока приоритетная очередь не пуста:
    1. Выведите из очереди самый маленький элемент из очереди.
    2. Добавьте первый элемент этого диапазона к выходной последовательности.
    3. Если он не пуст, вставьте оставшуюся часть последовательности обратно вочередь приоритетов.

Правильность этого алгоритма по сути является обобщением доказательства того, что двустороннее слияние работает правильно - если вы всегда добавляете наименьший элемент из любого диапазона,и все диапазоны отсортированы, вы получите последовательность в целом.

Сложность этого алгоритма во время выполнения может быть найдена следующим образом.Пусть M будет общим количеством элементов во всех последовательностях.Если мы используем двоичную кучу, то мы делаем не более O (M) вставок и O (M) удалений из очереди приоритетов, поскольку для каждого элемента, записанного в выходную последовательность, есть очередь для удаления наименьшей последовательности, за которой следуетпоставьте в очередь оставшуюся часть последовательности обратно в очередь.Каждый из этих шагов требует O (lg N) операций, потому что вставка или удаление из двоичной кучи с N элементами в нем занимает O (lg N) времени.Это дает чистое время выполнения O (M lg N), которое растет менее чем линейно с числом входных последовательностей.

Может быть способ получить это еще быстрее, но это кажется довольно хорошим решением,Использование памяти составляет O (N), потому что нам нужны O (N) накладные расходы для двоичной кучи.Если мы реализуем двоичную кучу, сохраняя указатели на последовательности, а не на сами последовательности, это не должно быть слишком большой проблемой, если у вас нет действительно смешного количества последовательностей для слияния.В этом случае просто объедините их в группы, которые вписываются в память, а затем объедините все результаты.

Надеюсь, это поможет!

2 голосов
/ 16 декабря 2012

Простой подход к объединению k отсортированных массивов (каждый длиной n) требует времени O (n k ^ 2), а не времени O (nk). Так как когда вы объединяете первые 2 массива, это занимает 2n времени, затем, когда вы объединяете третий с выходом, это занимает 3n времени, так как теперь мы объединяем два массива длиной 2n и n. Теперь, когда мы объединяем этот вывод с четвертым, это слияние требует 4n времени. Таким образом, последнее слияние (когда мы добавляем k-й массив в наш уже отсортированный массив) требует k * n времени. Таким образом, общее требуемое время составляет 2n + 3n + 4n + ... k * n, который является O (nk ^ 2).

Похоже, что мы можем сделать это за O (kn), но это не так, потому что каждый раз, когда наш массив, который мы объединяем, увеличивается в размере.
Хотя мы можем достичь лучшего предела, используя разделяй и властвуй. Я все еще работаю над этим и опубликую решение, если найду его.

1 голос
/ 04 октября 2013
Here is my implementation using MinHeap...

package merging;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;


public class N_Way_Merge {

int No_of_files=0;
String[] listString;
int[] listIndex;
PrintWriter pw;
private String fileDir = "D:\\XMLParsing_Files\\Extracted_Data";
private File[] fileList;
private BufferedReader[] readers;

public static void main(String[] args) throws IOException {

    N_Way_Merge nwm=new N_Way_Merge();

    long start= System.currentTimeMillis();

    try {

        nwm.createFileList();

        nwm.createReaders();
        nwm.createMinHeap();
    }
    finally {
        nwm.pw.flush();
        nwm.pw.close();
        for (BufferedReader readers : nwm.readers) {

            readers.close();

        }
    }
    long end = System.currentTimeMillis();
    System.out.println("Files merged into a single file.\nTime taken: "+((end-start)/1000)+"secs");
}

public void createFileList() throws IOException {
    //creates a list of sorted files present in a particular directory
    File folder = new File(fileDir);
    fileList = folder.listFiles();
    No_of_files=fileList.length;
    assign();
    System.out.println("No. of files - "+ No_of_files);

}

public void assign() throws IOException
{
    listString = new String[No_of_files];
    listIndex = new int[No_of_files];
    pw = new PrintWriter(new BufferedWriter(new FileWriter("D:\\XMLParsing_Files\\Final.txt", true)));
}

public void createReaders() throws IOException {
    //creates array of BufferedReaders to read the files
    readers = new BufferedReader[No_of_files];

    for(int i=0;i<No_of_files;++i)
    {
        readers[i]=new BufferedReader(new FileReader(fileList[i]));
    }
}

public void createMinHeap() throws IOException {

    for(int i=0;i<No_of_files;i++)
    {
        listString[i]=readers[i].readLine();
        listIndex[i]=i;
    }

    WriteToFile(listString,listIndex);

}

public void WriteToFile(String[] listString,int[] listIndex) throws IOException{

    BuildHeap_forFirstTime(listString, listIndex);
    while(!(listString[0].equals("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz")))
    {
        pw.println(listString[0]);
        listString[0]=readers[listIndex[0]].readLine();

        MinHeapify(listString,listIndex,0);
    }

}
public void BuildHeap_forFirstTime(String[] listString,int[] listIndex){

    for(int i=(No_of_files/2)-1;i>=0;--i)
        MinHeapify(listString,listIndex,i);

}

public void MinHeapify(String[] listString,int[] listIndex,int index){

    int left=index*2 + 1;
    int right=left + 1;
    int smallest=index;
    int HeapSize=No_of_files;
    if(left <= HeapSize-1  && listString[left]!=null &&  (listString[left].compareTo(listString[index])) < 0)
        smallest = left;

    if(right <= HeapSize-1 && listString[right]!=null &&  (listString[right].compareTo(listString[smallest])) < 0)
        smallest=right;



    if(smallest!=index)
    {
        String temp=listString[index];
        listString[index]=listString[smallest];
        listString[smallest]=temp;

        listIndex[smallest]^=listIndex[index];
        listIndex[index]^=listIndex[smallest];
        listIndex[smallest]^=listIndex[index];

        MinHeapify(listString,listIndex,smallest);
    }

}

}

1 голос
/ 05 января 2013

Я написал этот фрагмент кода в стиле STL, который объединяет N-way, и подумал, что я опубликую его здесь, чтобы не дать другим изобретать велосипед заново. :)

Предупреждение: это только мягко проверено. Проверьте перед использованием. :)

Вы можете использовать это так:

#include <vector>

int main()
{
    std::vector<std::vector<int> > v;
    std::vector<std::vector<int>::iterator> vout;
    std::vector<int> v1;
    std::vector<int> v2;
    v1.push_back(1);
    v1.push_back(2);
    v1.push_back(3);
    v2.push_back(0);
    v2.push_back(1);
    v2.push_back(2);
    v.push_back(v1);
    v.push_back(v2);
    multiway_merge(v.begin(), v.end(), std::back_inserter(vout), false);
}

Также позволяет использовать пары итераторов вместо самих контейнеров.

Если вы используете Boost.Range, вы можете удалить часть стандартного кода.

код:

#include <algorithm>
#include <functional>  // std::less
#include <iterator>
#include <queue>  // std::priority_queue
#include <utility>  // std::pair
#include <vector>

template<class OutIt>
struct multiway_merge_value_insert_iterator : public std::iterator<
    std::output_iterator_tag, OutIt, ptrdiff_t
>
{
    OutIt it;
    multiway_merge_value_insert_iterator(OutIt const it = OutIt())
        : it(it) { }

    multiway_merge_value_insert_iterator &operator++(int)
    { return *this; }

    multiway_merge_value_insert_iterator &operator++()
    { return *this; }

    multiway_merge_value_insert_iterator &operator *()
    { return *this; }

    template<class It>
    multiway_merge_value_insert_iterator &operator =(It const i)
    {
        *this->it = *i;
        ++this->it;
        return *this;
    }
};

template<class OutIt>
multiway_merge_value_insert_iterator<OutIt>
    multiway_merge_value_inserter(OutIt const it)
{ return multiway_merge_value_insert_iterator<OutIt>(it); };

template<class Less>
struct multiway_merge_value_less : private Less
{
    multiway_merge_value_less(Less const &less) : Less(less) { }
    template<class It1, class It2>
    bool operator()(
        std::pair<It1, It1> const &b /* inverted */,
        std::pair<It2, It2> const &a) const
    {
        return b.first != b.second && (
            a.first == a.second ||
            this->Less::operator()(*a.first, *b.first));
    }
};

struct multiway_merge_default_less
{
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return std::less<T>()(a, b); }
};

template<class R>
struct multiway_merge_range_iterator
{ typedef typename R::iterator type; };

template<class R>
struct multiway_merge_range_iterator<R const>
{ typedef typename R::const_iterator type; };

template<class It>
struct multiway_merge_range_iterator<std::pair<It, It> >
{ typedef It type; };

template<class R>
typename R::iterator multiway_merge_range_begin(R &r)
{ return r.begin(); }

template<class R>
typename R::iterator multiway_merge_range_end(R &r)
{ return r.end(); }

template<class R>
typename R::const_iterator multiway_merge_range_begin(R const &r)
{ return r.begin(); }

template<class R>
typename R::const_iterator multiway_merge_range_end(R const &r)
{ return r.end(); }

template<class It>
It multiway_merge_range_begin(std::pair<It, It> const &r)
{ return r.first; }

template<class It>
It multiway_merge_range_end(std::pair<It, It> const &r)
{ return r.second; }

template<class It, class OutIt, class Less, class PQ>
OutIt multiway_merge(
    It begin, It const end, OutIt out, Less const &less,
    PQ &pq, bool const distinct = false)
{
    while (begin != end)
    {
        pq.push(typename PQ::value_type(
            multiway_merge_range_begin(*begin),
            multiway_merge_range_end(*begin)));
        ++begin;
    }
    while (!pq.empty())
    {
        typename PQ::value_type top = pq.top();
        pq.pop();
        if (top.first != top.second)
        {
            while (!pq.empty() && pq.top().first == pq.top().second)
            { pq.pop(); }
            if (!distinct ||
                pq.empty() ||
                less(*pq.top().first, *top.first) ||
                less(*top.first, *pq.top().first))
            {
                *out = top.first;
                ++out;
            }

            ++top.first;
            pq.push(top);
        }
    }
    return out;
}

template<class It, class OutIt, class Less>
OutIt multiway_merge(
    It const begin, It const end, OutIt out, Less const &less,
    bool const distinct = false)
{
    typedef typename multiway_merge_range_iterator<
        typename std::iterator_traits<It>::value_type
    >::type SubIt;
    if (std::distance(begin, end) < 16)
    {
        typedef std::vector<std::pair<SubIt, SubIt> > Remaining;
        Remaining remaining;
        remaining.reserve(
            static_cast<size_t>(std::distance(begin, end)));
        for (It i = begin; i != end; ++i)
        {
            if (multiway_merge_range_begin(*i) !=
                multiway_merge_range_end(*i))
            {
                remaining.push_back(std::make_pair(
                    multiway_merge_range_begin(*i),
                    multiway_merge_range_end(*i)));
            }
        }
        while (!remaining.empty())
        {
            typename Remaining::iterator smallest =
                remaining.begin();
            for (typename Remaining::iterator
                i = remaining.begin();
                i != remaining.end();
            )
            {
                if (less(*i->first, *smallest->first))
                {
                    smallest = i;
                    ++i;
                }
                else if (distinct && i != smallest &&
                    !less(
                        *smallest->first,
                        *i->first))
                {
                    i = remaining.erase(i);
                }
                else { ++i; }
            }
            *out = smallest->first;
            ++out;
            ++smallest->first;
            if (smallest->first == smallest->second)
            { smallest = remaining.erase(smallest); }
        }
        return out;
    }
    else
    {
        std::priority_queue<
            std::pair<SubIt, SubIt>,
            std::vector<std::pair<SubIt, SubIt> >,
            multiway_merge_value_less<Less>
        > q((multiway_merge_value_less<Less>(less)));
        return multiway_merge(begin, end, out, less, q, distinct);
    }
}

template<class It, class OutIt>
OutIt multiway_merge(
    It const begin, It const end, OutIt const out,
    bool const distinct = false)
{
    return multiway_merge(
        begin, end, out,
        multiway_merge_default_less(), distinct);
}
1 голос
/ 08 декабря 2012

См. http://en.wikipedia.org/wiki/External_sorting. Вот мой взгляд на k-way merge на основе кучи, использующий буферизованное чтение из источников для эмуляции сокращения ввода / вывода:

public class KWayMerger<T>
{
    private readonly IList<T[]> _sources;
    private readonly int _bufferSize;
    private readonly MinHeap<MergeValue<T>> _mergeHeap;
    private readonly int[] _indices;

    public KWayMerger(IList<T[]> sources, int bufferSize, Comparer<T> comparer = null)
    {
        if (sources == null) throw new ArgumentNullException("sources");

        _sources = sources;
        _bufferSize = bufferSize;

        _mergeHeap = new MinHeap<MergeValue<T>>(
                      new MergeComparer<T>(comparer ?? Comparer<T>.Default));
        _indices = new int[sources.Count];
    }

    public T[] Merge()
    {
        for (int i = 0; i <= _sources.Count - 1; i++)
            AddToMergeHeap(i);

        var merged = new T[_sources.Sum(s => s.Length)];
        int mergeIndex = 0;

        while (_mergeHeap.Count > 0)
        {
            var min = _mergeHeap.ExtractDominating();
            merged[mergeIndex++] = min.Value;
            if (min.Source != -1) //the last item of the source was extracted
                AddToMergeHeap(min.Source);
        }

        return merged;
    }

    private void AddToMergeHeap(int sourceIndex)
    {
        var source = _sources[sourceIndex];
        var start = _indices[sourceIndex];
        var end = Math.Min(start + _bufferSize - 1, source.Length - 1);

        if (start > source.Length - 1)
            return; //we're done with this source

        for (int i = start; i <= end - 1; i++)
            _mergeHeap.Add(new MergeValue<T>(-1, source[i]));   

        //only the last item should trigger the next buffered read
        _mergeHeap.Add(new MergeValue<T>(sourceIndex, source[end]));

        _indices[sourceIndex] += _bufferSize; //we may have added less items, 
        //but if we did we've reached the end of the source so it doesn't matter
    } 
}

internal class MergeValue<T>
{
    public int Source { get; private set; }
    public T Value { get; private set; }

    public MergeValue(int source, T value)
    {
        Value = value;
        Source = source;
    }
}

internal class MergeComparer<T> : IComparer<MergeValue<T>>
{
    public Comparer<T> Comparer { get; private set; }

    public MergeComparer(Comparer<T> comparer)
    {
        if (comparer == null) throw new ArgumentNullException("comparer");
        Comparer = comparer;
    }

    public int Compare(MergeValue<T> x, MergeValue<T> y)
    {
        Debug.Assert(x != null && y != null);
        return Comparer.Compare(x.Value, y.Value);
    }
}

Вот одна из возможных реализаций MinHeap<T>. Некоторые тесты:

* * 1010
0 голосов
/ 18 июля 2016

Java-реализация алгоритма min heap для объединения k отсортированных массивов:

public class MergeKSorted {

/**
 * helper object to store min value of each array in a priority queue, 
 * the kth array and the index into kth array
 *
 */
static class PQNode implements Comparable<PQNode>{
    int value;
    int kth = 0;
    int indexKth = 0;

    public PQNode(int value, int kth, int indexKth) {
        this.value = value;
        this.kth = kth;
        this.indexKth = indexKth;
    }
    @Override
    public int compareTo(PQNode o) {
        if(o != null) {
            return Integer.valueOf(value).compareTo(Integer.valueOf(o.value));
        }
        else return 0;
    }

    @Override
    public String toString() {
        return value+" "+kth+" "+indexKth;
    }
}
public static void mergeKSorted(int[][] sortedArrays) {
    int k = sortedArrays.length;
    int resultCtr = 0;
    int totalSize = 0;
    PriorityQueue<PQNode> pq = new PriorityQueue<>();
    for(int i=0; i<k; i++) {
        int[] kthArray = sortedArrays[i];
        totalSize+=kthArray.length;
        if(kthArray.length > 0) {
            PQNode temp = new PQNode(kthArray[0], i, 0);
            pq.add(temp); 
        }
    }
    int[] result = new int[totalSize];
    while(!pq.isEmpty()) {
        PQNode temp = pq.poll();
        int[] kthArray = sortedArrays[temp.kth];
        result[resultCtr] = temp.value;
        resultCtr++;            
        temp.indexKth++;
        if(temp.indexKth < kthArray.length) {
            temp = new PQNode(kthArray[temp.indexKth], temp.kth, temp.indexKth);
            pq.add(temp);
        }

    }
    print(result);
}

public static void print(int[] a) {
    StringBuilder sb = new StringBuilder();
    for(int v : a) {
        sb.append(v).append(" ");
    }
    System.out.println(sb);
}

public static void main(String[] args) {
     int[][] sortedA = {
         {3,4,6,9},
         {4,6,8,9,12},
         {3,4,9},
         {1,4,9}    
     };
     mergeKSorted(sortedA);
}

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...