NetMQ是ZeroMQ的C#移植版本,它是對標準socket介面的擴展。它提供了一種非同步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。本文記錄了NetMQ的源碼進行學習並分析理解。 ...
前言
介紹
[NetMQ](https://github.com/zeromq/netmq.git)是ZeroMQ的C#移植版本,它是對標準socket介面的擴展。它提供了一種非同步消息隊列,多消息模式,消息過濾(訂閱),對多種傳輸協議的無縫訪問。
當前有2個版本正在維護,版本3最新版為3.3.4,版本4最新版本為4.0.1。本文檔是對4.0.1分支代碼進行分析。
目的
對NetMQ的源碼進行學習並分析理解,因此寫下該系列文章,本系列文章暫定編寫計劃如下:
- 消息隊列NetMQ 原理分析1-Context和ZObject
- 消息隊列NetMQ 原理分析2-IO線程和完成埠
- 消息隊列NetMQ 原理分析3-命令產生/處理、創建Socket和回收線程
- 消息隊列NetMQ 原理分析4-Session、Option和Pipe
- 消息隊列NetMQ 原理分析5-Engine
- 消息隊列NetMQ 原理分析6-TCP和Inpoc實現
- 消息隊列NetMQ 原理分析7-Device
- 消息隊列NetMQ 原理分析8-不同類型的Socket
- 消息隊列NetMQ 原理分析9-實戰
友情提示: 看本系列文章時最好獲取源碼,更有助於理解。
命令
命令結構
Command定義如下
internal struct Command
{
public Command([CanBeNull] ZObject destination, CommandType type, [CanBeNull] object arg = null) : this()
{
Destination = destination;
CommandType = type;
Arg = arg;
}
[CanBeNull]
public ZObject Destination { get; }
public CommandType CommandType { get; }
[CanBeNull]
public object Arg { get; private set; }
public override string ToString()
{
return base.ToString() + "[" + CommandType + ", " + Destination + "]";
}
}
其包含了3個信息:調用者,命令類型和命令參數。
命令產生
還記的《消息隊列NetMQ 原理分析1-Context和ZObject》中我們介紹過NetMQ中的命令類型嗎?待處理命令全部會存放著Socket
的信箱中。當Socket
有命令(連接完成、發送完成或接受完成等)需要處理時調用基類ZObject
的SendCommand
方法。
private void SendCommand([NotNull] Command cmd)
{
m_ctx.SendCommand(cmd.Destination.ThreadId, cmd);
}
ZObject
實際調用Context
的SendCommand方法
public void SendCommand(int threadId, [NotNull] Command command)
{
m_slots[threadId].Send(command);
}
m_slots[threadId]
保存的是當前IO線程的IO信箱IOThreadMailbox
,在《消息隊列NetMQ 原理分析2-IO線程和完成埠》
我們簡單介紹了IOThreadMailbox
的結構。
[NotNull] private readonly YPipe<Command> m_commandPipe = new YPipe<Command>(Config.CommandPipeGranularity, "mailbox");
IOThreadMailbox
中維護這一個Command
管道,該管道實際就是一個先進先出隊列,詳細解析會在第四章進行介紹。
public void Send(Command command)
{
bool ok;
lock (m_sync)
{
//向管道寫入命令
m_commandPipe.Write(ref command, false);
//成功寫入會返回false,表示有命令需要處理
ok = m_commandPipe.Flush();
}
if (!ok)
{
//向完成埠傳遞信號
m_proactor.SignalMailbox(this);
}
}
public bool TryRecv(out Command command)
{
return m_commandPipe.TryRead(out command);
}
public void RaiseEvent()
{
if (!m_disposed)
{
m_mailboxEvent.Ready();
}
}
IOThreadMailbox
的主要就是這三個方法
- 當有命令來的時候調用
Send
方法向管道(隊列)寫入命令。寫完時,會向完成埠傳遞信號。 - 當有命令需要處理時調用
TryRecv
方法讀取 - 當完成埠接收到信號需要命令處理時,調用
RaiseEvent
(實際是信箱的IO線程的RaiseEvent
方法)進行處理命令。
public void SignalMailbox(IOThreadMailbox mailbox)
{
//該方法會向完成埠的隊列中插入一個信號狀態
m_completionPort.Signal(mailbox);
}
有關於完成埠介紹請查看《消息隊列NetMQ 原理分析2-IO線程和完成埠》
命令處理
當有命令需要處理時,完成埠會接收到信號。
private void Loop()
{
...
int timeout = ExecuteTimers();
int removed;
if (!m_completionPort.GetMultipleQueuedCompletionStatus(timeout != 0 ? timeout : -1, completionStatuses, out removed))
continue;
for (int i = 0; i < removed; i++)
{
try
{
if (completionStatuses[i].OperationType == OperationType.Signal)
{
var mailbox = (IOThreadMailbox)completionStatuses[i].State;
mailbox.RaiseEvent();
}
...
}
...
}
...
}
線上程輪詢方法Loop
中,當接收到需要處理的數據時,首先會判斷是否是信號,若為信號,則將狀態(參數)轉化為IOThreadMailbox
類型,同時調用RaiseEvent
方法處理命令。
public void Ready()
{
Command command;
while (m_mailbox.TryRecv(out command))
command.Destination.ProcessCommand(command);
}
當有命令需要處理時,會調用IOThreadMailbox
的TryRecv
方法從管道(隊列,先進先出)中獲取第一個命令進行處理。
創建Socket(SocketBase)
在介紹回收線程工作之前,我們先看下創建一個新的Socket
做了哪些工作,這裡的Socket
實際是NetMQ中的SocketBase
。
RequestSocket socket = new RequestSocket();
socket.Connect("tcp://127.0.0.1:12345");
NetMQSocket
是NetMQ的Socket
的基類。
public RequestSocket(string connectionString = null) : base(ZmqSocketType.Req, connectionString, DefaultAction.Connect)
{
}
internal NetMQSocket(ZmqSocketType socketType, string connectionString, DefaultAction defaultAction)
{
m_socketHandle = NetMQConfig.Context.CreateSocket(socketType);
m_netMqSelector = new NetMQSelector();
Options = new SocketOptions(this);
m_socketEventArgs = new NetMQSocketEventArgs(this);
Options.Linger = NetMQConfig.Linger;
if (!string.IsNullOrEmpty(connectionString))
{
var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
.Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
foreach (string endpoint in endpoints)
{
if (endpoint[0] == '@')
{
Bind(endpoint.Substring(1));
}
else if (endpoint[0] == '>')
{
Connect(endpoint.Substring(1));
}
else if (defaultAction == DefaultAction.Connect)
{
Connect(endpoint);
}
else
{
Bind(endpoint);
}
}
}
}
首先會根據Socket
的類型創建對應的Socket
,調用的是Context
的CreateSocket
方法。具體的請看創建SocketBase。最終創建方法是調用SocketBase
的Create
方法
public static SocketBase Create(ZmqSocketType type, [NotNull] Ctx parent, int threadId, int socketId)
{
switch (type)
{
...
case ZmqSocketType.Req:
return new Req(parent, threadId, socketId);
...
default:
throw new InvalidException("SocketBase.Create called with invalid type of " + type);
}
}
創建完後,就對地址進行解析。若有多個地址,則可用,分隔。
var endpoints =
connectionString.Split(new[] {','}, StringSplitOptions.RemoveEmptyEntries)
.Select(a => a.Trim()).Where(a=> !string.IsNullOrEmpty(a));
解析完成後則用預設的方式進行綁定或連接,如RequestSocket
預設為連接,而ResponseSocket
則為綁定。
創建連接
首先對地址進行解析,判斷當前是tcp還是其他協議。然後會根據協議類型創建對應的Socket,具體的協議類型分析請查看《消息隊列NetMQ 原理分析6-TCP和Inpoc實現》
private static void DecodeAddress([NotNull] string addr, out string address, out string protocol) { const string protocolDelimeter = "://"; int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal); protocol = addr.Substring(0, protocolDelimeterIndex); address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length); }
- 負載均衡選擇一個IO線程。
- 創建
Session
,Socket
和Session
的關係如圖所示
創建管道,創建管道會創建一對單向管道,形成“一個”雙向管道。頭尾分別連接
Socket
和Session
,如上圖所示。創建管道完畢後需要設置管道的回調事件,管道1設置回調為Socket
的回調方法,管道2設置為Session
的回調方法。
具體關於
Session
和Pipe
的內容請查看《消息隊列NetMQ 原理分析4-Session、Option和Pipe》。
- 處理
Socket
和Session
的關係
protected void LaunchChild([NotNull] Own obj)
{
// Specify the owner of the object.
obj.SetOwner(this);
// Plug the object into the I/O thread.
SendPlug(obj);
// Take ownership of the object.
SendOwn(this, obj);
}
將
Session
的宿主設置為該Socket
private void SetOwner([NotNull] Own owner) { Debug.Assert(m_owner == null); m_owner = owner; }
為IO對象設置
Session
,當管道有數據交互時,Session
的回調方法就會觸發。protected void SendPlug([NotNull] Own destination, bool incSeqnum = true) { if (incSeqnum) destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Plug)); }
SessionBase
的ProcessPlug
會被觸發protected override void ProcessPlug() { m_ioObject.SetHandler(this); if (m_connect) StartConnecting(false); }
將當前
Session
加入到Socket
的Session
集合中,protected void SendOwn([NotNull] Own destination, [NotNull] Own obj) { destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Own, obj)); }
SocketBase
的父類方法SendOwn
(Own方法)方法會被觸發,將Session
加入到集合中protected override void ProcessOwn(Own obj) { ... // Store the reference to the owned object. m_owned.Add(obj); }
創建綁定
首先對地址進行解析,判斷當前是tcp還是其他協議。然後會根據協議類型創建對應的
Socket
,具體的協議類型分析請查看《消息隊列NetMQ 原理分析6-TCP和Inpoc實現》private static void DecodeAddress([NotNull] string addr, out string address, out string protocol) { const string protocolDelimeter = "://"; int protocolDelimeterIndex = addr.IndexOf(protocolDelimeter, StringComparison.Ordinal); protocol = addr.Substring(0, protocolDelimeterIndex); address = addr.Substring(protocolDelimeterIndex + protocolDelimeter.Length); }
負載均衡選擇一個IO線程。
處理
Socket
和Session
的關係
protected void LaunchChild([NotNull] Own obj)
{
// Specify the owner of the object.
obj.SetOwner(this);
// Plug the object into the I/O thread.
SendPlug(obj);
// Take ownership of the object.
SendOwn(this, obj);
}
將
Listener
的宿主設置為該Socket
private void SetOwner([NotNull] Own owner) { Debug.Assert(m_owner == null); m_owner = owner; }
為IO對象設置
Listener
,當管道有數據交互是,Listener
的回調方法就會觸發。protected void SendPlug([NotNull] Own destination, bool incSeqnum = true) { if (incSeqnum) destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Plug)); }
Listener
的ProcessPlug
會被觸發protected override void ProcessPlug() { m_ioObject.SetHandler(this); m_ioObject.AddSocket(m_handle); //接收非同步socket Accept(); }
將當前
Listener
加入到Socket
的Listener
集合中,protected void SendOwn([NotNull] Own destination, [NotNull] Own obj) { destination.IncSeqnum(); SendCommand(new Command(destination, CommandType.Own, obj)); }
SocketBase
的父類方法SendOwn
(Own方法)方法會被觸發,將Listener
加入到集合中protected override void ProcessOwn(Own obj) { ... // Store the reference to the owned object. m_owned.Add(obj); }
SocketBase
的創建處理就完成了
回收線程
(垃圾)回收線程是專門處理(清理)非同步關閉的Socket
的線程,它在NetMQ中起到至關重要的作用。
internal class Reaper : ZObject, IPollEvents
{
...
}
Reaper
是一個ZObject對象,同時實現了IPollEvents
介面,該介面的作用是當有信息接收或發送時進行處理。回收線程實現了InEvent
方法。
internal interface IPollEvents : ITimerEvent
{
void InEvent();
void OutEvent();
}
InEvent
方法實現和IO線程的Ready
方法很像,都是遍歷需要處理的命令進行處理。
public void InEvent()
{
while (true)
{
Command command;
if (!m_mailbox.TryRecv(0, out command))
break;
command.Destination.ProcessCommand(command);
}
}
釋放SocketBase
當有SocketBase
需要釋放時,會向完成埠發送Reap
信號。
public void Close()
{
// Mark the socket as disposed
m_disposed = true;
//工作線程向Socket郵箱發送Reap信號
//回收線程會做剩下的工作
SendReap(this);
}
發送回收命令
向回收線程的郵箱發送當前SocketBase
的回收命令
protected void SendReap([NotNull] SocketBase socket)
{
SendCommand(new Command(m_ctx.GetReaper(), CommandType.Reap, socket));
}
處理回收命令
Reap
接收到釋放信號進行處理
protected override void ProcessReap(SocketBase socket)
{
// Add the socket to the poller.
socket.StartReaping(m_poller);
++m_sockets;
}
SocketBase回收
- 將當前
Socket
的加入到回收線程的中,當Socket
接收到數據時,由回收線程回調該Socket的處理事件進行處理。 - 當前Socket終止處理
最後確認釋放
internal void StartReaping([NotNull] Poller poller) { m_poller = poller; m_handle = m_mailbox.Handle; m_poller.AddHandle(m_handle, this); m_poller.SetPollIn(m_handle); Terminate(); CheckDestroy(); }
終止處理
- 終止
Socket
時,直接終止即可
預設情況下
NetMQ
的Linger
值被設置為-1,就是說如果網路讀寫沒有進行完是不能退出的。如果Linger
被設置為0,那麼中斷時會丟棄一切未完成的網路操作。如果Linger
被設置的大於0,那麼將等待Linger
毫秒用來完成未完成的網路讀寫,在指定的時間里完成或者超時都會立即返回。
若終止的是
Session
,則需要發送請求清理關聯Socket的當前Session
對象protected void Terminate() { ... if (m_owner == null) { // 釋放的是Socket,Owner為空 ProcessTerm(m_options.Linger); } else { // 釋放的是Session則會關聯一個Socket SendTermReq(m_owner, this); } }
終止SocketBase
- 終止
SocketBase
時,需要先中斷當前SocketBase
關聯的SessionBase
- 然後增加需要終端請求響應的個數,當全部都響應了則處理第四步驟
- 清空當前關聯的
Session
集合 最後當
Session
全部終止後發送給當前Socket
宿主終端響應(TermAck)protected override void ProcessTerm(int linger) { ... // 斷開所有session的連接 foreach (Own it in m_owned) { SendTerm(it, linger); } RegisterTermAcks(m_owned.Count); m_owned.Clear(); CheckTermAcks(); }
終止當前Socket關聯的Session
- 如果終端管道命令在終止命令前處理了,則立即終止當前
Session
- 標記當前準備終止
- 若
Ligner
大於0 則等到N毫秒後再終止終止Socket
和Session
之間的管道 檢查管道是否還有數據要讀取
protected override void ProcessTerm(int linger) { if (m_pipe == null) { ProceedWithTerm(); return; } m_pending = true; if (linger > 0) { Debug.Assert(!m_hasLingerTimer); m_ioObject.AddTimer(linger, LingerTimerId); m_hasLingerTimer = true; } // 是否需要等待一定時間後消息處理完再終止管道. m_pipe.Terminate(linger != 0); // TODO: Should this go into pipe_t::terminate ? // In case there's no engine and there's only delimiter in the // pipe it wouldn't be ever read. Thus we check for it explicitly. m_pipe.CheckRead(); }
終止管道
管道狀態如下所示
private enum State { /// <summary> Active 表示在中斷命令開始前的狀態 </summary> Active, /// <summary> Delimited 表示在終端命令接收前從管道接收到分隔符</summary> Delimited, /// <summary> Pending 表示中斷命令已經從管道接收,但是仍有待定消息可讀</summary> Pending, /// <summary> Terminating 表示所有待定消息都已經讀取等待管道終止確認信號返回 </summary> Terminating, /// <summary> Terminated 表示終止命令是由用戶顯示調用 </summary> Terminated, /// <summary> Double_terminated 表示用戶調用了終止命令同時管道也調用了終止命令 </summary> DoubleTerminated }
終止當前管道
若當前狀態為Terminated
、DoubleTerminated
和Terminating
不再處理終止命令public void Terminate(bool delay) { //判斷當前狀態是否可處理終止命令 ... if (m_state == State.Active) { // 向另一個管道發送終止命令然後等待確認終止 SendPipeTerm(m_peer); m_state = State.Terminated; } else if (m_state == State.Pending && !m_delay) { // 若有待處理數據,但是不等待直接終止,則向另一個管道發送確認終止. m_outboundPipe = null; SendPipeTermAck(m_peer); m_state = State.Terminating; } else if (m_state == State.Pending) { //若有待處理數據但是需要等到則不處理. } else if (m_state == State.Delimited) { //若已經獲取到限定符但是還沒有收到終止命令則忽略定界符,然後發送終止命令給另一個管道 SendPipeTerm(m_peer); m_state = State.Terminated; } else { // 沒有其他狀態 Debug.Assert(false); } //停止向外發送的消息 m_outActive = false; if (m_outboundPipe != null) { //拋棄未發送出的消息. Rollback(); // 這裡不會再先查水位,所以即使管道滿了也可再寫入,向管道寫入定界符 . var msg = new Msg(); msg.InitDelimiter(); m_outboundPipe.Write(ref msg, false); Flush(); } }
終止另一個管道
protected override void ProcessPipeTerm() { // 這是一個簡單的例子有道管道終止 //若沒有更多待處理消息需要讀取,或者這個管道已經丟去待處理數據,我們直接將狀態設置為正在終止(terminating),否則我們擱置待處理狀態直到所有待處理消息被髮送 if (m_state == State.Active) { if (!m_delay) { //不需要等到消息處理 m_state = State.Terminating; m_outboundPipe = null; //發送終止確認 SendPipeTermAck(m_peer); } else m_state = State.Pending; return; } // 若定界符碰巧在終止命令之前到達,將狀態改為正在終止 if (m_state == State.Delimited) { m_state = State.Terminating; m_outboundPipe = null; SendPipeTermAck(m_peer); return; } // 當管道併發關閉,則狀態改為DoubleTerminated if (m_state == State.Terminated) { m_state = State.DoubleTerminated; m_outboundPipe = null; SendPipeTermAck(m_peer); return; } // pipe_term is invalid in other states. Debug.Assert(false); }
確認終止
protected override void ProcessPipeTermAck() { // 通知Socket或Session中斷當前管道 . Debug.Assert(m_sink != null); m_sink.Terminated(this); // 若正則處理或double_terminated這裡不做任何事 // 簡化釋放管道,在已終止狀態,我們必須在釋放這個管道之前確認 //其他狀態都是非法的 if (m_state == State.Terminated) { m_outboundPipe = null; SendPipeTermAck(m_peer); } else Debug.Assert(m_state == State.Terminating || m_state == State.DoubleTerminated); // 刪除所有管道中的未讀消息,然後釋放流入管道 var msg = new Msg(); while (m_inboundPipe.TryRead(out msg)) { msg.Close(); } m_inboundPipe = null; }
整體回收
Socket
流程圖如下:
public virtual void InEvent()
{
// 回收線程命令會調用此事件
try
{
ProcessCommands(0, false);
}
catch
{
// ignored
}
finally
{
CheckDestroy();
}
}
private void CheckDestroy()
{
// socket釋放完則做最後的清除和釋放工作.
if (m_destroyed)
{
// 從回收線程移除輪詢
m_poller.RemoveHandle(m_handle);
// 釋放socke.
DestroySocket(this);
// 通知已釋放.
SendReaped();
// Deallocate.
base.ProcessDestroy();
}
}
總結
該篇介紹命令處理方式和回收線程回收Socket
,順便介紹了下創建SocketBase
的細節性問題。以便對釋放Socket
有更清晰的認識。