Из того, что вы сказали, я понял, что вам нужно разделить веб-сокет между несколькими службами, каждый из которых использует его так, как считает нужным. В этом сценарии предлагаемая реализация имеет Middleware
, который содержит различные службы, для которых требуется сокет. Особое внимание следует уделить при решении, какая служба выполняет запись, а какая - чтение. Веб-сокет является поточно-ориентированным в контексте reading-writing
в то же время, но не в других сценариях. (writing-writing
, reading-reading
)
Запуск
public void ConfigureServices(IServiceCollection colllection){
collection.AddSingleton<Sender>();
collection.AddSingleton<Receiver>();
}
public void Configure(IApplicationBuilder app)
{
app.UseWebsockets();
app.UseMiddleware<DispatcherMiddleware>(); //receives socket here
}
Middleware
public class DispatcherMiddleware{
private Sender sender;
private Receiver receiver;
private RequestDelegate next;
public Dispatcher(RequestDelegate req,Sender _sender,Receiver _receiver)
{
this.sender=_sender;
this.receiver=_receiver;
this.next=req;
}
public async Task Invoke(HttpContext context){
if(!context.WebSockets.IsWebSocketRequest){
return;
}
await DispatchAsync(context.WebSockets);
}
public async Task DispatchAsync(WebsocketManager manager){
WebSocket socket=await manager.AcceptWebSocketAsync();
Task t1=Task.Run(async()=>await this.sender.SendAsync(socket));
Task t2=Task.Run(async()=>await this.receiver.ReceiveAsync(socket));
await Task.WhenAll(t1,t2);
}
}
Службы веб-сокетов (пример)
public class Sender(){
public async Task SendAsync(WebSocket ws){
try{
while(true){
// ws.SendAsync()
}
}
catch(Exception ex){
}
}
}
public class Receiver{
public async Task ReceiveAsync(WebSocket ws){
try
{
while(true){
//ws.ReceiveAsync(buffer)
}
}
catch (System.Exception)
{
throw;
}
}
}
Редактировать
Вы не можете выполнять одновременное чтение / запись в одном и том же сокете. При этом, если вы хотите использовать один и тот же сокет, вы можете сделать его thread-safe
. Поскольку операция async
, я предлагаю использовать класс SemaphoreSlim
.
Ниже приведена реализация shared
сокета:
public class SafeSocket {
private const int BUFFER_SIZE = 1024;
private WebSocket socket { get; set; }
private SemaphoreSlim @lock = new SemaphoreSlim(1);
public SafeSocket(WebSocket socket) {
this.socket = socket;
}
public async Task<byte[]> ReadAsync() {
byte[] buffer = ArrayPool<byte>.Shared.Rent(BUFFER_SIZE);
await @lock.WaitAsync();
try {
await this.socket.SendAsync(buffer, WebSocketMessageType.Text, true, CancellationToken.None);
return buffer;
} catch (Exception) {
throw;
} finally {
@lock.Release();
}
}
}
public class DispatcherMiddleware {
private List<Sender> senders;
private Receiver receiver;
private RequestDelegate next;
public DispatcherMiddleware(RequestDelegate req, List<Sender> _senders, Receiver _receiver) {
this.senders = _senders;
this.receiver = _receiver;
this.next = req;
}
public async Task Invoke(HttpContext context) {
if (!context.WebSockets.IsWebSocketRequest) {
return;
}
await DispatchAsync(context.WebSockets);
}
public async Task DispatchAsync(WebSocketManager manager) {
WebSocket socket = await manager.AcceptWebSocketAsync();
SafeSocket commonSocket = new SafeSocket(socket);
Task[] senderTasks = new Task[this.senders.Count];
for (int senderIndex = 0; senderIndex < senderTasks.Length; senderIndex++) {
int index = senderIndex;// careful at index ! , make copy and use it inside closure !
senderTasks[senderIndex] = Task.Run(async () => {
await commonSocket.ReadAsync();
});
}
}
Имейте в виду, что порядок сообщений не будет сохранен. То же самое можно применить к получателям.
В итоге вы получите N
отправителей и K
получателей, которые в момент времени T
:
1
отправитель напишет
1
получатель будет читать
N-1
отправители будут ждать lock
K-1
получатели будут ждать lock
Таким образом, в конечном итоге будет выполнено всего лишь 2
операций.
Я не знаю, если это то, что вам нужно.