本文為原創文章,轉載請註明出處,謝謝 數據的發佈與訂閱 1、應用 服務端監聽數據改變,客戶端創建/更新節點數據,客戶端提供數據,服務端處理 2、原理 客戶端監控節點數據改變事件(例如配置信息,下圖的config節點),啟動時在伺服器節點下創建臨時節點(圖中servers下節點) 服務端監聽工作伺服器 ...
本文為原創文章,轉載請註明出處,謝謝
數據的發佈與訂閱
1、應用
服務端監聽數據改變,客戶端創建/更新節點數據,客戶端提供數據,服務端處理
2、原理
- 客戶端監控節點數據改變事件(例如配置信息,下圖的config節點),啟動時在伺服器節點下創建臨時節點(圖中servers下節點)
- 服務端監聽工作伺服器的子節點更新,觸發自身存儲的工作伺服器列表,同時監聽訂閱節點的數據改變事件(下圖中command節點)
3、架構圖
- config:配置信息節點
- servers:伺服器列表父節點
- command:數據訂閱節點
4、客戶端流程圖
5、服務端流程圖
6、核心類關係圖
- SubscribeClient:模擬服務端、客戶端啟動
- WorkServer:客戶端
- ManageServer:服務端
- ServerConfig:配置信息
- ServerData:server數據
7、核心代碼
- WorkServer 監聽
public WorkServer(String serverPath, String configPath, final ServerData serverData, ServerConfig serverConfig, ZkClient zkClient) { this.serverPath = serverPath; this.configPath = configPath; this.serverData = serverData; this.serverConfig = serverConfig; this.zkClient = zkClient; dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) { try{ String data = new String((byte[])o); System.out.println(data); ServerConfig config = (ServerConfig)JSON.parseObject(data,ServerConfig.class); updateConfig(config); System.out.println("server name:"+serverData.getName()+" update config:"+config.toString()); }catch (Exception e) { e.printStackTrace(); } } @Override public void handleDataDeleted(String s) throws Exception { } }; }
- WorkServer 註冊
private void registerMe() { String myPath = serverPath.concat("/").concat(serverData.getAddress()); try{ if(!zkClient.exists(myPath)) zkClient.createEphemeral(myPath,JSON.toJSONString(serverData).getBytes()); }catch (ZkNoNodeException e ) { zkClient.createPersistent(serverPath, true); registerMe(); } }
ps:此操作是在servers節點下創建節點,需要servers節點已存在
- ManageServer 監聽
public ManageServer(String serverPath, String configPath, String cmdPath, ServerConfig serverConfig, ZkClient zkClient) { this.serverPath = serverPath; this.configPath = configPath; this.cmdPath = cmdPath; this.serverConfig = serverConfig; this.zkClient = zkClient; childListener = new IZkChildListener() { @Override public void handleChildChange(String s, List<String> strings) throws Exception { workServerList = strings; System.out.println("----"+workServerList.toString()); } }; dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception { String cmd = new String((byte[])o); System.out.println("cmd="+cmd); exeCmd(cmd); } @Override public void handleDataDeleted(String s) throws Exception { } }; }
-
childListener :監聽Servers下的節點變化
-
dataListener :監聽command節點的數據變化
-
- ManageServer 執行操作
/** *模擬命令:1、list 2、create 3、modify */ private void exeCmd(String cmd) { if("list".equals(cmd)) { System.out.println(workServerList.toString()); }else if("create".equals(cmd)) { exeCreate(); }else if("modify".equals(cmd)) { exeModify(); }else { System.out.println("this cmd can not exe"); } }
- SubscribeClient
public class SubscribeClient { private static final int CLIENT_QTY =3; private static final String ZOOKEEPER_URL = "192.168.117.128:2181"; private static final String SERVERPATH = "/servers"; private static final String CONFIGPATH = "/config"; private static final String CMDPATH = "/command"; @Test public void testSubScribe() throws IOException { ServerConfig config = new ServerConfig("DBURL...","DBUSER...","DBPAW..."); ZkClient zk = new ZkClient(ZOOKEEPER_URL,5000,5000,new BytesPushThroughSerializer()); ManageServer manageServer = new ManageServer(SERVERPATH,CONFIGPATH,CMDPATH,config,zk); manageServer.start(); for (int i = 0; i < CLIENT_QTY; i++) { ZkClient zkq = new ZkClient(ZOOKEEPER_URL,5000,5000,new BytesPushThroughSerializer()); ServerData data = new ServerData("address"+i,i+"","name_"+i); WorkServer server = new WorkServer(SERVERPATH,CONFIGPATH,data,config,zkq); server.start(); } new BufferedReader(new InputStreamReader(System.in)).readLine(); } }