istream и ostream с разделяемым потоком, взаимно поточно-ориентированным для дуплексного ввода-вывода? - PullRequest
7 голосов
/ 01 апреля 2012

Я получил собственный потоковый буфер для буферизованного сетевого сокета ввода-вывода, переопределяя переполнение, переполнение и синхронизацию, так что переполнение является взаимно поточно-ориентированным с набором двух других (у меня есть отдельные входные и выходные внутренние буферы).Это прекрасно работает, но я хочу использовать это для полнодуплексного ввода-вывода, когда один поток может вводить, в то время как другой выводит, поэтому я хотел бы использовать istream для принимающего потока и ostream для отправляющего, при совместном использовании сетиStreambuf, как это абстрагирует все вещи сокета.Мой вопрос заключается в том, в какой степени члены streambuf затрагиваются операциями ввода в istream, не пересекаются с членами streambuf, затронутыми операциями вывода в ostream, если входные и выходные буферы разделены?

Было бы лучшебыть в состоянии сделать это, вместо того, чтобы отделять сокеты от моей абстракции streambuf, чтобы сокет можно было разделить между istream и ostream с отдельными streambuf - тогда мне также понадобятся две версии streambuf - одна с одним внутреннимбуфер (для использования только в istream или только в ostream) и один с двумя внутренними буферами, как у меня сейчас, для использования в iostream ... отстой, так как это дополнительные классы и дублирование кода.

Ответы [ 3 ]

2 голосов
/ 01 апреля 2012

Специальной гарантии для std::streambuf (или std::basic_streambuf<...>) не дается, которая дает больше гарантий, чем обычно предоставляется. То есть вы можете иметь несколько потоков, считывающих состояние объекта в любое время, но если есть один поток, изменяющий состояние объекта, не должно быть другого потока, обращающегося к объекту. И чтение, и запись символов изменяют состояние буфера потока, то есть с формальной точки зрения их нельзя использовать без внешней синхронизации.

Внутренне два буфера полностью разделены и не имеют ничего общего друг с другом. Операции над потоковыми буферами изменяют их довольно структурированным образом, и я не могу себе представить, что любая реализация имела бы явное взаимодействие между двумя наборами указателей. То есть в практическом плане я не думаю, что между чтением и письмом необходима какая-либо синхронизация. Однако раньше я не осознавал, что два набора указателей буфера могут фактически использовать одни и те же строки кэша, что может, по крайней мере, вызвать проблемы с производительностью. Я не думаю, что это должно вызвать какие-либо проблемы с корректностью.

Единственный ресурс, который может совместно использоваться двумя потоковыми буферами, - это объект std::locale, который, как предполагается, не имеет состояния. Кроме того, std::streambuf не использует сам этот объект: это ваш потоковый буфер, который может использовать некоторые из аспектов (например, std::codecvt<...> фасет). Поскольку языковой стандарт изменяется с помощью вызова виртуальной функции imbue(), вы сможете перехватить это изменение и выполнить любую необходимую синхронизацию, если ваш потоковый буфер использует языковой стандарт.

Таким образом, стандарт не дает никаких гарантий того, что он будет работать с использованием параллельных потоков для чтения и записи с использованием одного и того же потокового буфера. На практике DS9k, вероятно, является единственной системой, в которой происходит сбой, и два потока могут в итоге эффективно синхронизироваться из-за того, что указатели буфера заканчиваются в строках общего кэша.

2 голосов
/ 01 апреля 2012

Входные и выходные последовательности по существу независимы. Хорошая диаграмма на cppreference.com :

Diagram of streambuf members

Единственная вещь, совместно используемая между последовательностями ввода и вывода, это объект locale, который содержит фасет codecvt, используемый для выполнения перевода кодировки текста.

Теоретически, изменение текстовой кодировки в середине потока было бы небезопасным, но на практике библиотеки в любом случае вообще не поддерживают эту операцию!

Тебе должно быть хорошо идти.

0 голосов
/ 22 декабря 2018

Для полнодуплексного режима вам понадобятся два буфера.Если вы используете интерфейсы streambuf для обоих, так что вы можете подключиться к обычному интерфейсу ostream и istream, тогда полная картина выглядит примерно так:

A cross linked two streambuf interface

Два буфера, очевидно, полностью независимы и симметричны, поэтому мы можем игнорировать одну сторону и сосредоточиться только на одном буфере.

Более того, можно с уверенностью предположить, что существует только два потока: чтениенить и пишущая нить.Если будет задействовано больше потоков, то два потока будут читать одновременно или писать одновременно;что привело бы к нежелательным условиям гонки и, следовательно, не имеет смысла.Мы можем предположить, что у пользователя будет какой-то механизм, обеспечивающий запись только одного потока за раз в потоковый буфер, а также только один поток за раз читает из него.

В самом общем случае фактическийбуфер существует из нескольких смежных блоков памяти.Каждая область размещения и получения полностью находится внутри одного такого блока.Пока они находятся в разных блоках памяти, они, опять же, не связаны.

Каждая область get / put существует из трех указателей: один указатель, указывающий на начало области (eback / pbase),один указатель, который указывает на один байт после конца области (например, eptr / epptr), и указатель, который указывает на текущую позицию в области (gptr / pptr).Каждый из этих указателей может быть доступен напрямую из класса, производного от std::streambuf, через защищенные средства доступа с тем же именем (eback(), pbase(), egptr(), epptr(), gptr() и pptr()). Обратите внимание, что здесь мы имеем в виду eback(), egptr() and gptr() одного streambuf и pbase(), epptr() and pptr() другого streambuf (см. Изображение выше).

std::streambuf имеет публичные функции, которые обращаются или изменяютсяэти шесть указателей.Это:

table, th, td {
  border: 1px solid black;
  border-collapse: collapse;
}
th, td {
  padding: 5px;
}
<table style="width:100%">
<caption>Public member functions of <code>std::streambuf</code></caption>
<tr>
<th>Method</th><th>Changes and/or accesses</th>
</tr>
<tr>
<td><code>pubsetbuf()</code></td><td>Calls <code>setbuf()</code> of the most derived class</td>
<tr></tr>
<td><code>pubseekoff()</code></td><td>Calls <code>seekoff()</code> of the most derived class</td>
<tr></tr>
<td><code>pubseekpos()</code></td><td>Calls <code>seekpos()</code> of the most derived class</td>
<tr></tr>
<td><code>pubsync()</code></td><td>Calls <code>sync()</code> of the most derived class</td>
</tr><tr>
<td><code>in_avail()</code></td><td>Get area</td>
</tr><tr>
<td><code>snextc()</code></td><td>Calls <code>sbumpc()</code>, <code>uflow()</code> and/or <code>sgetc()</code></td>
</tr><tr>
<td><code>sbumpc()</code></td><td><code>gptr</code>, possibly calls <code>uflow()</code></td>
</tr><tr>
<td><code>sgetc()</code></td><td><code>gptr</code>, possibly calls <code>underflow()</code></td>
</tr><tr>
<td><code>sgetn()</code></td><td>Calls <code>xgetn()</code> of the most derived class.</td>
</tr><tr>
<td><code>sputc()</code></td><td><code>pptr</code>, possibly calls <code>overflow()</code></td>
</tr><tr>
<td><code>sputn()</code></td><td>Calls <code>xsputn()</code> of the most derived class</td>
</tr><tr>
<td><code>sputbackc()</code></td><td><code>gptr</code>, possibly calls <code>pbackfail()</code></td>
</tr><tr>
<td><code>sungetc()</code></td><td><code>gptr</code>, possibly calls <code>pbackfail()</code></td>
</tr>
</table>

Функции защищенных элементов:

table, th, td {
  border: 1px solid black;
  border-collapse: collapse;
}
th, td {
  padding: 5px;
}
<table style="width:100%">
<caption>Protected member functions of <code>std::streambuf</code></caption>
<tr>
<th>Method</th><th>Changes and/or accesses</th>
</tr>
<tr>
<td><code>setbuf()</code></td><td>User defined (could be used for single array buffers)</td>
<tr></tr>
<td><code>seekoff()</code></td><td>User defined (repositions get area)</td>
<tr></tr>
<td><code>seekpos()</code></td><td>User defined (repositions get area)</td>
<tr></tr>
<td><code>sync()</code></td><td>User defined (could do anything, depending on which buffer this is, could change either get area or put area)</td>
</tr><tr>
<td><code>showmanyc()</code></td><td>User defined (get area; if put area uses the same allocated memory block, can also accesses pptr)</td>
</tr><tr>
<td><code>underflow()</code></td><td>User defined (get area; but also strongly coupled to put ares)</td>
</tr><tr>
<td><code>uflow()</code></td><td>Calls underflow() and advances gptr</td>
</tr><tr>
<td><code>xsgetn()</code></td><td>get area (as if calling <code>sbumpc()</code> repeatedly), might call <code>uflow()</code></td>
</tr><tr>
<td><code>gbump()</code></td><td>gptr</td>
</tr><tr>
<td><code>setg()</code></td><td>get area</td>
</tr><tr>
<td><code>xsputn()</code></td><td>put area (as if calling <code>sputc()</code> repeatedly), might call <code>overflow()</code> or do something similar)</td>
</tr><tr>
<td><code>overflow()</code></td><td>put area</td>
</tr><tr>
<td><code>pbump()</code></td><td>pptr</td>
</tr><tr>
<td><code>setp()</code></td><td>put area</td>
</tr><tr>
<td><code>pbackfail()</code></td><td>User defined (might be pure horror; aka, get and put area)</td>
</tr>
</table>

Мы должны разделить действия чтения и записи на действия для каждого (непрерывного) блока памяти.Конечно, возможно, что один вызов -say- sputn() записывает в несколько блоков, но мы можем блокировать и разблокировать для каждого действия блока.

Есть несколько существенных состояний буфера, изображенных вкартинка ниже.Зеленые стрелки представляют переходы между состояниями, выполняемыми потоком (ами), которые считывают данные из области получения, в то время как синие стрелки представляют переходы между состояниями, выполняемыми потоком (ами), которые записывают данные в область размещения.Другими словами, два зеленых действия не могут происходить одновременно;не может два синих действия.Но действие зеленого и синего может происходить одновременно.

Read and write streambuf transitions

Мне все еще нужно написать реализацию для этого, но myподход будет состоять в том, чтобы использовать один мьютекс на буфер и блокировать его только в начале каждого действия, чтобы получить необходимую информацию для выполнения действия чтения и / или записи.Затем в конце этого действия снова заблокируйте мьютекс, чтобы посмотреть, не было ли что-то изменено другим потоком, и / или завершите чтение / запись административным действием.

Каждый раз, когда поток записи поднимает pptr,egptr обновляется атомарно, если только в начале действия записи eback! = pbase;в этом случае egptr не нуждается в обновлении, конечно.Для этого необходимо заблокировать мьютекс перед ударом и разблокировать после обновления egptr.Таким образом, тот же мьютекс блокируется при перемещении областей get или put.Мы не могли бы заблокировать мьютекс при поднятии самого gptr, но если мы это сделаем, то в начале соответствующего действия чтения в буфере были данные, и одновременное действие записи не изменило бы это, поэтому нет опасности, чтоwrite thread (s) попытается переместить область получения одновременно.

Я отредактирую этот ответ, когда выясню больше деталей.

...