Объединение нескольких потоков и запись в отсортированный выходной поток - PullRequest
1 голос
/ 04 июня 2019

Я недавно наткнулся на этот вопрос в нескольких интервью. Это выглядит следующим образом:

У вас есть список потоков чисел, которые вы можете читать из асинхронно. Учитывая поток записи для потребителя, как бы вы прочитали числа из потоков, объединили и отсортировали их и, наконец, записали в выходной поток?

Input:

 1. stream 1: 1, 2, 3, 4...
 2. stream 2: 1, 2, 3, 4, 5...

Output: 1, 1, 2, 2, 3, 3, 4, 4, 5....

Мы можем предположить, что контракт выглядит следующим образом:

final class Stream {
   public interface boolean isClosed();
   public interface int read();
}

// utility method to write numbers to consumer stream
public void write(Integer number);

Мои первые мысли об этом вопросе заключались в том, что он похож на буфер кэша LRU . Однако есть 2 проблемы с этим:

  • Как объединять и поддерживать порядок и синхронизацию потоков чтения?
  • Как убедиться, что числа записаны без каких-либо задержек? Как только запись будет выполнена, порядок записи больше не будет гарантирован для дальнейших чисел в потоке?

Я уверен, что здесь есть оговорка, которую я неправильно истолковал или полностью упустил. Любая помощь в этом была бы отличной. Спасибо.

1 Ответ

1 голос
/ 04 июня 2019

Я предполагаю, что существует много потоков, и каждый из них предоставляет данные в порядке возрастания.

Теперь у вашего потокового интерфейса есть небольшая проблема.Кроме того, вы можете создать класс, состоящий из пар (lastValue, stream), который имеет методы peek (возвращает lastValue) и readNext (если stream.isClosed() возвращает null, в противном случае возвращается пара (stream.read(), stream).И еще одна вещь, мы можем добавить compareTo метод, который сначала сравнивает lastValue, а затем сравнивает stream.hashCode().

Что эти пары покупают нам, так это то, что мы можем поместить их в PriorityQueue . Что позволяет нам реализовать что-то вроде этой логики:

construct initial pairs from streams
put them into a priority queue named pq
while 0 < pq.size()
    take the smallest pair p
    print p.peek()
    pNext = p.readNext()
    if pNext != null
        add pNext to pq

Если n - это общий объем данных между потоками, а m - это количество потоков, этот алгоритм будет приниматьвремя O(n log(m) + m). Бит + m отображается только в том случае, если вы начали с большого количества закрытых потоков.

...