LZ最近離職,閑著也是閑著,打算梳理下 公司做的是電商,CTO打算把2.0系統用java 語言開發,LZ目前不打算做java,所以 選擇離職。離職前,在公司負責的最後一個項目 供應鏈系統。 系統分為 3套子系統: 1 供應鏈工作平臺(即用戶操作平臺):採用CS架構,Sqlite做緩存。 2 消息中心 ...
LZ最近離職,閑著也是閑著,打算梳理下
公司做的是電商,CTO打算把2.0系統用java 語言開發,LZ目前不打算做java,所以 選擇離職。離職前,在公司負責的最後一個項目 供應鏈系統。
系統分為 3套子系統:
1 供應鏈工作平臺(即用戶操作平臺):採用CS架構,Sqlite做緩存。
2 消息中心: 後臺程式,採用mina.net,scoket 長連接 保證服務消息的 推送,後臺消息的提醒,和 系統對最新訂單的緩存。
3 WindowsService 監控消息中心,保證消息中心 隨系統的開啟而啟動
mina 簡介:
Apache Mina Server 是一個網路通信應用框架,它主要是對基於TCP/IP、UDP/IP協議棧的通信框架,Mina 可以幫助我們快速開發高性能、高擴展性的網路通信應用,Mina 提供了事件驅動、非同步(Mina 的非同步IO 預設使用的是Java NIO 作為底層支持)操作的編程模型。
mina.net 是Apache Mina Server 的.net 版本 主要用於系統的長連接通信
安裝:
PM> Install-Package Mina
mina.net 主要對象:
1.AsyncSocketConnector: 發起鏈接
2.IoSession:mina 鏈接創建成功之後 客戶端,服務的的數據傳送
3.IoHandlerAdapter:適配器類。可以擴展
4.DemuxingProtocolCodecFactory:構建協議編碼工廠
適配器主要方法:
1 MessageReceived:收到消息時觸發
2.MessageSent 發送消息後觸發
3.SessionClosed 關閉Session時 觸發
4.SessionCreated 創建Session時 觸發
5.ExceptionCaught 發生異常時 觸發
6.SessionIdleSession 空閑時 觸發
實現思路:
創建mina鏈接:
public void StartProcess(LoginContext config) { AsyncSocketConnector connector = new Mina.Transport.Socket.AsyncSocketConnector(); //註冊協議編解碼器工廠 connector.FilterChain.AddLast("encoding", new ProtocolCodecFilter(new MyMinaCodecFactory())); //指定服務端IP 和埠號 connector.DefaultRemoteEndPoint = new IPEndPoint(IPAddress.Parse(MinaConfig.Ip), MinaConfig.Port); //初始化 消息處理類 var headerDic = CreateHeader(); //繼承IoHandlerAdapter構建適配器 MinaMessageHandler headler = new MinaMessageHandler(config, connector, headerDic); connector.Handler = headler; while (true) { try { //ClientHandler //建立鏈接 session = connector.Connect().Await().Session; break; } catch (Exception ex) { _Log.Error(ex.Message, ex); Thread.Sleep(1000); } } }
鏈接建立成功之後,觸發mina.net 內部機制的SessionCreated 方法,登錄用戶
public override void SessionCreated(Mina.Core.Session.IoSession session) { try { MyBaseMessage message = new LoginRequestMessage(ClientConfig.ClientAddr,ClientConfig.SharedSecret); (message as LoginRequestMessage).SetAutherString(); session.Write(message); } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.SessionCreated(session); } }
重寫MessageReceived方法,收到伺服器消息之後,處理相應事件
/// <summary> /// 收到消息時 觸發--處理消息,給伺服器發送處理結果 /// </summary> /// <param name="session"></param> /// <param name="message"></param> public override void MessageReceived(Mina.Core.Session.IoSession session, object message) { try { if (message is MyBaseMessage) { var m = message as MyBaseMessage; if (HeaderDic.Keys.Any(p=>p==m.GetCommandType())) { var messageHeader = HeaderDic[m.GetCommandType()]; messageHeader.Handle(session,m); } } } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.MessageReceived(session, message); } }
重寫 SessionClosed 事件,關閉session時,通知伺服器,客戶端已關閉鏈接
/// <summary> /// 關閉Session時 觸發-發送關閉消息 /// </summary> /// <param name="session"></param> public override void SessionClosed(Mina.Core.Session.IoSession session) { try { while (true) { try { if (Connector != null) { if (!Connector.Disposed) { session = Connector.Connect().Await().Session; break; } else { break; } } } catch (Exception ex) { Thread.Sleep(1000); } } } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.SessionClosed(session); } }
重寫 ExceptionCaught 方法,發生異常時,關閉鏈接
/// <summary> /// 發生異常時 觸發,關閉session 重新登錄 /// </summary> /// <param name="session"></param> /// <param name="cause"></param> public override void ExceptionCaught(Mina.Core.Session.IoSession session, Exception cause) { try { session.Close(true); _Log.Error(cause.Message, cause); } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.ExceptionCaught(session, cause); } }
重寫 SessionIdle 方法,session空閑時,測試心跳
/// <summary> /// Session 空閑時 發生 /// </summary> /// <param name="session"></param> /// <param name="status"></param> public override void SessionIdle(Mina.Core.Session.IoSession session, Mina.Core.Session.IdleStatus status) { try { MyBaseMessage message = new DetectionMessage(); session.Write(message); } catch (Exception ex) { _Log.Error(ex.Message, ex); } finally { base.SessionIdle(session, status); } }
構建協議編解碼器工廠
public class MyMinaCodecFactory : DemuxingProtocolCodecFactory { public MyMinaCodecFactory() { AddMessageEncoder(new MyMinaEncoder()); AddMessageDecoder(new MyMinaDecoder()); } }
編碼器工廠,將對象 序列號成 bytes 數據
public class MyMinaEncoder : IMessageEncoder<MyBaseMessage> { public void Encode(IoSession session, MyBaseMessage message, IProtocolEncoderOutput output) { IoBuffer buf = IoBuffer.Allocate(12); buf.AutoExpand = true; var messageBytes = message.EncodeMessage(); buf.Put(messageBytes); buf.Flip(); session.Write(buf); } public void Encode(IoSession session, object message, IProtocolEncoderOutput output) { IoBuffer buf = IoBuffer.Allocate(12); buf.AutoExpand = true; if (message is MyBaseMessage) { var m = message as MyBaseMessage; var messageBytes = m.EncodeMessage(); buf.Put(messageBytes); buf.Flip(); } session.Write(buf); } }
解碼器工廠,將位元組轉換為對象
public class MyMinaDecoder : IMessageDecoder { public ILog _Log = LogManager.GetLogger("MessageHandler"); public MessageDecoderResult Decodable(IoSession session,IoBuffer input) { try { if (input.Remaining < CommandConfig.messageHeaderLength) { return MessageDecoderResult.NeedData; } var headerBytes = new byte[CommandConfig.messageHeaderLength]; for (int i = 0; i < CommandConfig.messageHeaderLength; i++) { headerBytes[i] = input.Get(i); } var lengthBytes = new byte[4]; var commandIdBytes = new byte[4]; var sequenceBytes = new byte[4]; Array.Copy(headerBytes, 0, lengthBytes, 0, 4); Array.Copy(headerBytes, 4, commandIdBytes, 0, 4); Array.Copy(headerBytes, 8, sequenceBytes, 0, 4); var messageLength = lengthBytes.ByteToUint();//Convert.ToInt32(Encoding.Default.GetString(headerBytes, 0, 4)); var messageCommand = commandIdBytes.ByteToUint();//(uint)Convert.ToInt32(Encoding.Default.GetString(headerBytes, 4, 4)); if (messageCommand==CommandConfig.connect || messageCommand == CommandConfig.connectResp || messageCommand == CommandConfig.terminate || messageCommand == CommandConfig.terminateResp || messageCommand == CommandConfig.notify || messageCommand == CommandConfig.notifyResp || messageCommand == CommandConfig.cmppActiveTest || messageCommand == CommandConfig.cmppActiveTestResp) { return MessageDecoderResult.OK; } return MessageDecoderResult.NotOK; } catch (Exception ex) { _Log.Error(ex.Message, ex); return MessageDecoderResult.NeedData; } } }
結語:
博客寫的不多,不喜勿碰,謝謝
歡迎指點和糾正