最近在學WebSocket,服務端需要監聽多個WebSocket客戶端發送的消息。 開始的解決方法是每個WebSocket客戶端都添加一個線程進行監聽,代碼如下: /// <summary> /// 監聽埠 創建WebSocket /// </summary> /// <param name="h ...
最近在學WebSocket,服務端需要監聽多個WebSocket客戶端發送的消息。
開始的解決方法是每個WebSocket客戶端都添加一個線程進行監聽,代碼如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/// <summary> /// 監聽埠 創建WebSocket /// </summary> /// <param name="httpListener"></param> private void CreateWebSocket(HttpListener httpListener) { if (!httpListener.IsListening) throw new Exception("HttpListener未啟動"); HttpListenerContext listenerContext = httpListener.GetContextAsync().Result; if (!listenerContext.Request.IsWebSocketRequest) { CreateWebSocket(httpListener); return; } WebSocketContext webSocket = null; try { webSocket = new WebSocketContext(listenerContext, SubProtocol); } catch (Exception ex) { log.Error(ex); CreateWebSocket(HttpListener); return; } log.Info($"成功創建WebSocket:{webSocket.ID}"); int workerThreads = 0, completionPortThreads = 0; ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads); if (workerThreads <= ReservedThreadsCount + 1 || completionPortThreads <= ReservedThreadsCount + 1) { /** * 可用線程小於預留線程數量 * 通知客戶端關閉連接 * */ webSocket.CloseAsync(WebSocketCloseStatus.InternalServerError, "可用線程不足,無法連接").Wait(); } else { if (OnReceiveMessage != null) webSocket.OnReceiveMessage += OnReceiveMessage; webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket; webSocketContexts.Add(webSocket); // 線上程中監聽客戶端發送的消息 ThreadPool.QueueUserWorkItem(new WaitCallback(p => { (p as WebSocketContext).ReceiveMessageAsync().Wait(); }), webSocket); } CreateWebSocket(HttpListener); }線上程中添加監聽代碼
但是可用線程數量是有限的,先連接的客戶端一直遞歸接收消息,導致線程無限占用,後連接上的客戶端就沒有線程用於監聽接受消息了。
接受消息方法如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/// <summary> /// 遞歸 同步接收消息 /// </summary> /// <returns></returns> public void ReceiveMessage() { WebSocket webSocket = HttpListenerWebSocketContext.WebSocket; if (webSocket.State != WebSocketState.Open) throw new Exception("Http未握手成功,不能接受消息!"); var byteBuffer = WebSocket.CreateServerBuffer(ReceiveBufferSize); WebSocketReceiveResult receiveResult = null; try { receiveResult = webSocket.ReceiveAsync(byteBuffer, cancellationToken).Result; } catch (WebSocketException ex) { if (ex.InnerException is HttpListenerException) { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "客戶端斷開連接" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } else { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "WebSocket 連接異常" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } } catch (Exception ex) { log.Error(ex); CloseAsync(WebSocketCloseStatus.ProtocolError, "客戶端斷開連接" + ex.Message).Wait(TimeSpan.FromSeconds(20)); return; } if (receiveResult.CloseStatus.HasValue) { log.Info("接受到關閉消息!"); CloseAsync(receiveResult.CloseStatus.Value, receiveResult.CloseStatusDescription).Wait(TimeSpan.FromSeconds(20)); return; } byte[] bytes = new byte[receiveResult.Count]; Array.Copy(byteBuffer.Array, bytes, bytes.Length); string message = Encoding.GetString(bytes); log.Info($"{ID}接收到消息:{message}"); if (OnReceiveMessage != null) OnReceiveMessage.Invoke(this, message); if (!cancellationToken.IsCancellationRequested) ReceiveMessage(); }接受消息方法
這是不能接受的。
後來在Task中看到,在創建Task時可以設置TaskCreationOptions參數
該枚舉有個欄位LongRunning
LongRunning | 2 |
指定任務將是長時間運行的、粗粒度的操作,涉及比細化的系統更少、更大的組件。 它會向 TaskScheduler 提示,過度訂閱可能是合理的。 可以通過過度訂閱創建比可用硬體線程數更多的線程。 它還將提示任務計劃程式:該任務需要附加線程,以使任務不阻塞本地線程池隊列中其他線程或工作項的向前推動。 |
經過測試,可同時運行的任務數量的確可以超出可用線程數量。
測試如下:
沒有設置 TaskCreationOptions.LongRunning 代碼如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/// <summary> /// 測試任務 /// 只運行了9個任務 /// </summary> [TestMethod] public void TestTask1() { var cts = new CancellationTokenSource(); int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0; ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用輔助線程數目為{MaxCompletionPortThreads},最大可用非同步 I/O 線程數目為{MaxCompletionPortThreads}"); MaxWorkerThreads = 10; MaxCompletionPortThreads = 10; Console.WriteLine(@"設置線程池中輔助線程的最大數目為{0}, 線程池中非同步 I/O 線程的最大數目為{1} 同時運行30個長時運行線程,每個線程中運行一個同步方法,看是否30個線程是否都能運行。", MaxWorkerThreads, MaxCompletionPortThreads); ThreadPool.SetMaxThreads(10, 10); ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用輔助線程數目為{MaxCompletionPortThreads},最大可用非同步 I/O 線程數目為{MaxCompletionPortThreads}"); int count = 0; while (count++ < 30) { Task.Factory.StartNew(p => { int index = (int)p; int runCount = 0; LongRunningTask($"線程{index}", runCount, cts.Token); }, count, cts.Token, TaskCreationOptions.None, TaskScheduler.Default); } Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待超時,等待任務沒有執行 cts.Cancel(); } /// <summary> /// 長時運行任務 /// 遞歸運行 /// </summary> /// <param name="taskName">任務名稱</param> /// <param name="runCount">運行次數</param> /// <param name="token">傳播有關取消操作的通知</param> private void LongRunningTask(string taskName, int runCount, CancellationToken token) { PrintTask($"任務【{taskName}】線程ID【{Environment.CurrentManagedThreadId}】第【{++runCount}】次運行").Wait(); if (!token.IsCancellationRequested) LongRunningTask(taskName, runCount, token); } /// <summary> /// 非同步列印任務 等待1秒後列印消息 /// </summary> /// <param name="message">消息</param> /// <returns></returns> private Task PrintTask(string message) { return Task.Factory.StartNew(() => { Thread.Sleep(1000); Console.WriteLine(message); }); }測試代碼
測試結果
測試用了20秒才完成
主線程創建了一個等待10秒後完成的任務,任務等待超時20秒
說明主程式創建的任務沒有執行,而是等待超時了。
設置了 TaskCreationOptions.LongRunning 代碼如下:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/// <summary> /// 測試長時運行任務 /// 30個任務全部都運行了 /// </summary> [TestMethod] public void TestTaskLongRunning() { var cts = new CancellationTokenSource(); int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0; ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); MaxWorkerThreads = 10; MaxCompletionPortThreads = 10; Console.WriteLine($"最大可用輔助線程數目為{MaxCompletionPortThreads},最大可用非同步 I/O 線程數目為{MaxCompletionPortThreads}"); Console.WriteLine(@"設置線程池中輔助線程的最大數目為{0}, 線程池中非同步 I/O 線程的最大數目為{1} 同時運行30個長時運行線程,每個線程中運行一個同步方法,看是否30個線程是否都能運行。", MaxWorkerThreads, MaxCompletionPortThreads); ThreadPool.SetMaxThreads(10, 10); ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用輔助線程數目為{MaxCompletionPortThreads},最大可用非同步 I/O 線程數目為{MaxCompletionPortThreads}"); int count = 0; while (count++ < 30) { Task.Factory.StartNew(p => { int index = (int)p; int runCount = 0; LongRunningTask($"線程{index}", runCount, cts.Token); }, count, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待沒有超時,等待任務有執行 cts.Cancel(); }測試代碼
測試結果:
測試用了10秒完成
主線程創建了一個等待10秒後完成的任務,任務等待超時20秒
說明主程式創建的任務立即執行了,程式等待了10秒完成。
使用TaskCreationOptions.LongRunning 需要註意的是Action必須是同步方法同時運行任務書才能超出可以用線程數量,否則不能。
例如:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/// <summary> /// 測試長時運行任務 /// 只運行了前9個任務 /// </summary> [TestMethod] public void TestTaskLongRunning2() { var cts = new CancellationTokenSource(); int MaxWorkerThreads = 0, MaxCompletionPortThreads = 0; ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用輔助線程數目為{MaxCompletionPortThreads},最大可用非同步 I/O 線程數目為{MaxCompletionPortThreads}"); MaxWorkerThreads = 10; MaxCompletionPortThreads = 10; Console.WriteLine(@"設置線程池中輔助線程的最大數目為{0}, 線程池中非同步 I/O 線程的最大數目為{1} 同時運行30個長時運行線程,每個線程中運行一個非同步方法,看是否30個線程是否都能運行。", MaxWorkerThreads, MaxCompletionPortThreads); ThreadPool.SetMaxThreads(10, 10); ThreadPool.GetMaxThreads(out MaxWorkerThreads, out MaxCompletionPortThreads); Console.WriteLine($"最大可用輔助線程數目為{MaxCompletionPortThreads},最大可用非同步 I/O 線程數目為{MaxCompletionPortThreads}"); int count = 0; while (count++ < 30) { Task.Factory.StartNew(async p => { int index = (int)p; int runCount = 0; await LongRunningTaskAsync($"線程{index}", runCount, cts.Token); }, count, cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); } Task.Delay(TimeSpan.FromSeconds(10)).Wait(TimeSpan.FromSeconds(20)); // 等待沒有超時,等待任務有執行 cts.Cancel(); } /// <summary> /// 非同步長時運行任務 /// </summary> /// <param name="taskName">任務名稱</param> /// <param name="runCount">運行次數</param> /// <param name="token">傳播有關取消操作的通知</param> /// <returns></returns> private async Task LongRunningTaskAsync(string taskName, int runCount, CancellationToken token) { await PrintTask($"任務【{taskName}】線程ID【{Environment.CurrentManagedThreadId}】第【{++runCount}】次運行"); if (!token.IsCancellationRequested) await LongRunningTaskAsync(taskName, runCount, token); }測試代碼
測試結果
測試用了10秒完成
主線程創建了一個等待10秒後完成的任務,任務等待超時20秒
說明主程式創建的任務立即執行了,程式等待了10秒完成。
WebSocket修改後的監聽方法:
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
/// <summary> /// 監聽埠 創建WebSocket /// </summary> /// <param name="httpListener"></param> private void CreateWebSocket(HttpListener httpListener) { if (!httpListener.IsListening) throw new Exception("HttpListener未啟動"); HttpListenerContext listenerContext = httpListener.GetContext(); if (!listenerContext.Request.IsWebSocketRequest) { CreateWebSocket(httpListener); return; } WebSocketContext webSocket = null; try { webSocket = new WebSocketContext(listenerContext, SubProtocol); } catch (Exception ex) { log.Error(ex); CreateWebSocket(HttpListener); return; } log.Info($"成功創建WebSocket:{webSocket.ID}"); int workerThreads = 0, completionPortThreads = 0; ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads); if (OnReceiveMessage != null) webSocket.OnReceiveMessage += OnReceiveMessage; webSocket.OnCloseWebSocket += WebSocket_OnCloseWebSocket; Task.Factory.StartNew(() => { webSocket.ReceiveMessage(); }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default); CreateWebSocket(HttpListener); }View Code
修改後的WebSocket服務可以監聽超過可用線程數量的客戶端