概述 SQL Server Service Broker 用來創建用於交換消息的會話。消息在目標和發起方這兩個端點之間進行交換。消息用於傳輸數據和觸發消息收到時的處理過程。目標和發起方既可以在同一資料庫引擎實例的同一資料庫或不同資料庫中,也可以在不同資料庫引擎實例的同一資料庫或不同資料庫中。 每個 ...
概述
SQL Server Service Broker 用來創建用於交換消息的會話。消息在目標和發起方這兩個端點之間進行交換。消息用於傳輸數據和觸發消息收到時的處理過程。目標和發起方既可以在同一資料庫引擎實例的同一資料庫或不同資料庫中,也可以在不同資料庫引擎實例的同一資料庫或不同資料庫中。
每個 Service Broker 會話都有兩個端點:會話發起方和目標。您將執行下列任務:
- 為目標創建一個服務和隊列,併為發起方創建一個服務和隊列。
- 創建請求消息類型和答覆消息類型。
- 創建約定,指定請求消息從發起方傳遞到目標並且答覆消息從目標傳遞到發起方。
然後執行一個簡單會話:
- 啟動會話。
- 從發起方向目標發送一個請求。
- 在目標處接收請求並將答覆發送到發起方。
- 在發起方處接收答覆。
- 結束會話。
對於其兩端在同一 資料庫引擎 實例中的會話,其消息不通過網路傳輸。資料庫引擎 安全性和許可權將限制對授權主體的訪問。此方案不需要網路加密。
一、創建會話對象
1.啟用Service Broker
----創建資料庫 IF NOT EXISTS(SELECT * FROM SYS.DATABASES WHERE name='Dsend') BEGIN CREATE DATABASE Dsend; END USE master; GO ---開啟資料庫BROKER ALTER DATABASE Dsend SET ENABLE_BROKER; GO SELECT is_broker_enabled FROM SYS.DATABASES WHERE NAME='Dsend' USE Dsend; GO
2.創建消息類型
由於經常在多個資料庫引擎實例間引用 Service Broker 對象,因而大多數 Service Broker 對象的名稱都是 URI 格式的。這有助於確保它們在多台電腦上是唯一的。這兩種消息類型都指定 Service Broker 將只驗證消息是否是格式正確的 XML 文檔,並且指定 Service Broker 將不按照特定架構驗證 XML。
CREATE MESSAGE TYPE [//Dsend/test/RequestMessage] VALIDATION = WELL_FORMED_XML; CREATE MESSAGE TYPE [//Dsend/test/ReplyMessage] VALIDATION = WELL_FORMED_XML;
創建請求消息和答覆消息,並且消息的格式為XML格式。
註意:Service Broker 驗證傳入消息。如果消息包含的消息正文與指定的驗證類型不符,則 Service Broker 將放棄此無效消息,並向發送此消息的服務返回一條錯誤消息。會話雙方必須定義相同的消息類型名稱。為便於排除故障,儘管 Service Broker 不要求會話雙方使用相同的驗證,但通常會話雙方還是會為消息類型指定相同的驗證。消息類型不能是臨時對象。允許使用以 # 開頭的消息類型名稱,但它們是永久對象。
3.創建約定
約定用於定義在 Service Broker 會話中所使用的消息類型,還用於確定會話的哪一端可以發送該類型的消息。每個會話都要遵循一個約定。當會話開始時,啟動服務為會話指定約定。目標服務指定該目標服務將接受其會話的約定。
/* SENT BY INITIATOR ----指示只有會話的發起方纔能發送指定消息類型的消息。啟動會話的服務稱為會話的“發起方” SENT BY TARGET ----指示只有會話的目標才能發送指定消息類型的消息。接受由另一個服務啟動的會話的服務稱為會話的目標。 SENT BY ANY ----指示發起方和目標都可以發送此類型的消息。 */ CREATE CONTRACT [//Dsend/test/RequestContract] ([//Dsend/test/RequestMessage] SENT BY INITIATOR, ---約定只有發起方纔能使用//Dsend/test/RequestMessage消息類型 [//Dsend/test/ReplyMessage] SENT BY TARGET ---約定只有答覆方纔能使用//Dsend/test/ReplyMessage消息類型 );
4.創建隊列
隊列可以存儲消息。當一條針對某項服務的消息到達時,Service Broker 會將該消息放入與該服務關聯的隊列中。
創建發起方和答覆方的隊列。
CREATE QUEUE RequestQueue WITH STATUS=ON; CREATE QUEUE ReplyQueue WITH STATUS=ON;
註意:
1.隊列可以通過SELECT 語句查詢,但是不能使用INSERT、UPDATE、DELETE 或 TRUNCATE 語句來操作。只能使用在 Service Broker 會話中運行的語句(如 SEND、RECEIVE 和 END CONVERSATION)來修改隊列的內容。
2.隊列可能不是臨時對象。因此,以 # 開頭的隊列名稱無效。
3.通過以不可用狀態創建隊列,可以先準備好服務的基礎結構,然後再允許在隊列中接收消息。
4.如果隊列中沒有消息,則 Service Broker 不會停止激活存儲過程。如果隊列中在短時間內沒有可用消息,應退出激活存儲過程。
5.在 Service Broker 啟動存儲過程時將檢查激活存儲過程的許可權,而不是在創建隊列時檢查。CREATE QUEUE 語句不驗證 EXECUTE AS 子句中指定的用戶是否有許可權執行 PROCEDURE NAME 子句中指定的存儲過程。
6.隊列不可用時,Service Broker 將在資料庫的傳輸隊列中保存使用該隊列的服務的消息。sys.transmission_queue 目錄視圖提供傳輸隊列的視圖。
例:創建具有多個參數的隊列
以下示例在 DEFAULT
文件組中創建一個隊列。該隊列不可用。消息被保留在隊列中,直到消息所屬的會話結束。通過 ALTER QUEUE 啟用隊列後,該隊列將啟動存儲過程 2008R2.dbo.expense_procedure
來處理消息。此存儲過程以運行 CREATE QUEUE
語句的用戶的身份執行。該隊列最多啟動存儲過程的 10
個實例。
CREATE QUEUE ExpenseQueue WITH STATUS = OFF, RETENTION = ON, ACTIVATION ( PROCEDURE_NAME = AdventureWorks2008R2.dbo.expense_procedure, MAX_QUEUE_READERS = 10, EXECUTE AS SELF ) ON [DEFAULT] ;
5.創建服務
Service Broker 使用服務的名稱路由消息、將消息傳遞到資料庫中的正確隊列,以及強制執行會話的約定。一個服務可以同時綁定多個約束。
--1.創建要用於發起方的隊列和服務。由於未指定約定名稱,因而其他服務不可將此服務用作目標服務,此服務只能啟動會話。 CREATE SERVICE [//Dsend/test/RequestService] ON QUEUE RequestQueue GO ---2.創建接答覆服務 CREATE SERVICE [//Dsend/test/ReplyService] ON QUEUE ReplyQueue ([//Dsend/test/RequestContract] ) ; GO
註意:服務公開與其關聯的約定提供的功能,以便其他服務可使用該功能。CREATE SERVICE 語句指定針對此服務的約定。一個服務只能是使用該服務指定的約定會話的目標。未指定約定的服務不會向其他服務公開任何功能。
從此服務啟動的會話可使用任何約定。如果服務僅啟動會話,則創建服務時可不指定約定。Service Broker 從遠程服務接受新會話時,目標服務的名稱決定了 Broker 在會話中放入消息的隊列。
例:創建具有多個約定的服務
CREATE SERVICE [//Adventure-Works.com/Expenses] ON QUEUE ExpenseQueue ([//Adventure-Works.com/Expenses/ExpenseSubmission], [//Adventure-Works.com/Expenses/ExpenseProcessing]) ;
5.開啟回話
DECLARE @InitDlgHandle UNIQUEIDENTIFIER; DECLARE @RequestMsg NVARCHAR(100); BEGIN TRANSACTION; BEGIN DIALOG @InitDlgHandle FROM SERVICE [//Dsend/test/RequestService] ---指定的服務是用於答覆消息的返回地址 TO SERVICE N'//Dsend/test/ReplyService' ---指定的服務是消息發送到的地址。 ON CONTRACT [//Dsend/test/RequestContract] WITH ENCRYPTION = OFF; SELECT @InitDlgHandle; SELECT @RequestMsg = N'<RequestMsg>3</RequestMsg>'; SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [//Dsend/test/RequestMessage] (@RequestMsg); SELECT @RequestMsg AS SentRequestMsg; COMMIT TRANSACTION; GO
6.接收方接收消息並返回消息給發送方
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER; DECLARE @RecvReqMsg NVARCHAR(100); DECLARE @RecvReqMsgName sysname; DECLARE @Message NVARCHAR(100) BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP(1) @RecvReqDlgHandle = conversation_handle, @RecvReqMsg = message_body, @RecvReqMsgName = message_type_name FROM ReplyQueue ), TIMEOUT 1000; SELECT @RecvReqMsg AS ReceivedRequestMsg; SELECT @RecvReqMsgName; BEGIN -----返回接收消息確認結果到發起方 DECLARE @ReplyMsg NVARCHAR(100); SELECT @ReplyMsg = N'<ReplyMsg>Reply Message</ReplyMsg>'; SEND ON CONVERSATION @RecvReqDlgHandle MESSAGE TYPE [//Dsend/test/ReplyMessage] (@ReplyMsg); SELECT @RecvReqDlgHandle; ---正常的流程不是在這裡結束會話 END CONVERSATION @RecvReqDlgHandle; END IF @RecvReqMsgName =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @RecvReqDlgHandle; END COMMIT TRANSACTION; GO
例:接收數據同時插入表變數
DECLARE @conversation_group_id UNIQUEIDENTIFIER ; DECLARE @procTable TABLE( service_instance_id UNIQUEIDENTIFIER, handle UNIQUEIDENTIFIER, message_sequence_number BIGINT, service_name NVARCHAR(512), service_contract_name NVARCHAR(256), message_type_name NVARCHAR(256), validation NCHAR, message_body VARBINARY(MAX)) ; SET @conversation_group_id = <retrieve conversation group ID from database> ; RECEIVE TOP (1) conversation_group_id, conversation_handle, message_sequence_number, service_name, service_contract_name, message_type_name, validation, message_body FROM ExpenseQueue INTO @procTable WHERE conversation_group_id = @conversation_group_id ;View Code
7.發送方收到消息終止會話
DECLARE @RecvReplyMsg NVARCHAR(100); DECLARE @RecvReplyDlgHandle UNIQUEIDENTIFIER; DECLARE @RecvReqMsgName sysname; BEGIN TRANSACTION; WAITFOR ( RECEIVE TOP(1) @RecvReplyDlgHandle = conversation_handle, @RecvReplyMsg = message_body, @RecvReqMsgName=message_type_name FROM RequestQueue ), TIMEOUT 1000; IF @RecvReqMsgName =N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' BEGIN END CONVERSATION @RecvReqDlgHandle; END SELECT @RecvReplyMsg AS ReceivedReplyMsg; COMMIT TRANSACTION; GO
三、查詢
SELECT state_desc,* FROM sys.conversation_endpoints SELECT message_type_name, CAST(message_body as xml) message,* FROM dbo.RequestQueue SELECT message_type_name, CAST(message_body as xml) message,* FROM dbo.ReplyQueue SELECT CAST(message_body as xml) message,* FROM sys.transmission_queue --END CONVERSATION '236AF2C5-57F4-E711-A9E6-005056C00008';
四、刪除會話對象
IF EXISTS (SELECT * FROM sys.services WHERE name = N'//AWDB/1DBSample/TargetService') DROP SERVICE [//AWDB/1DBSample/TargetService]; IF EXISTS (SELECT * FROM sys.service_queues WHERE name = N'TargetQueue1DB') DROP QUEUE TargetQueue1DB; -- Drop the intitator queue and service if they already exist. IF EXISTS (SELECT * FROM sys.services WHERE name = N'//AWDB/1DBSample/InitiatorService') DROP SERVICE [//AWDB/1DBSample/InitiatorService]; IF EXISTS (SELECT * FROM sys.service_queues WHERE name = N'InitiatorQueue1DB') DROP QUEUE InitiatorQueue1DB; IF EXISTS (SELECT * FROM sys.service_contracts WHERE name = N'//AWDB/1DBSample/SampleContract') DROP CONTRACT [//AWDB/1DBSample/SampleContract]; IF EXISTS (SELECT * FROM sys.service_message_types WHERE name = N'//AWDB/1DBSample/RequestMessage') DROP MESSAGE TYPE [//AWDB/1DBSample/RequestMessage]; IF EXISTS (SELECT * FROM sys.service_message_types WHERE name = N'//AWDB/1DBSample/ReplyMessage') DROP MESSAGE TYPE [//AWDB/1DBSample/ReplyMessage]; GO
參考:http://www.cnblogs.com/downmoon/archive/2011/04/05/2005900.html
參考:https://docs.microsoft.com/zh-cn/sql/t-sql/statements/create-message-type-transact-sql
總結
使用單資料庫會話可以處理一般的隊列發送和讀寫消息的場景,其中sys.conversation_endpoints系統視圖需要重點關註。
備註: 作者:pursuer.chen 博客:http://www.cnblogs.com/chenmh 本站點所有隨筆都是原創,歡迎大家轉載;但轉載時必須註明文章來源,且在文章開頭明顯處給明鏈接,否則保留追究責任的權利。 《歡迎交流討論》 |