一、背景 書接手寫MQ框架(二)-服務端實現 ,前面介紹了服務端的實現。但是具體使用框架過程中,用戶肯定是以客戶端的形式跟服務端打交道的。客戶端的好壞直接影響了框架使用的便利性。 雖然框架目前是通過web的形式提供功能的,但是某的目標其實是通過socket實現,所以不僅需要有客戶端,還要包裝一下,讓 ...
一、背景
書接手寫MQ框架(二)-服務端實現 ,前面介紹了服務端的實現。但是具體使用框架過程中,用戶肯定是以客戶端的形式跟服務端打交道的。客戶端的好壞直接影響了框架使用的便利性。
雖然框架目前是通過web的形式提供功能的,但是某的目標其實是通過socket實現,所以不僅需要有客戶端,還要包裝一下,讓用戶在使用過程中不需要關心服務端是如何實現的。
簡單來說,就是客戶端使用必須方便。
二、客戶端實現
1、HttpUtil
目前客戶端的核心功能是HttpUtil這個類,使用httpClient實現的,主要是為了請求服務端。
具體實現如下:
package com.shuimutong.gmq.client.util; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.shuimutong.gmq.client.bean.HttpResponseBean; import com.shuimutong.gutil.common.GUtilCommonUtil; /** * http請求工具類 * @ClassName: HttpUtil * @Description:(這裡用一句話描述這個類的作用) * @author: 水木桶 * @date: 2019年10月29日 下午9:43:54 * @Copyright: 2019 [水木桶] All rights reserved. */ public class HttpUtil { private final static Logger log = LoggerFactory.getLogger(HttpUtil.class); private static CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal(); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { HTTP_CLIENT.close(); } catch (IOException e) { log.error("HTTP_CLIENT-closeException", e); } } }); } /** * get請求 * * @param url * @return * @throws IOException */ public static HttpResponseBean get(String url) throws IOException { HttpResponseBean responseBean = null; HttpGet httpGet = new HttpGet(url); CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet); try { HttpEntity httpEntity = res.getEntity(); String body = EntityUtils.toString(httpEntity); responseBean = new HttpResponseBean(res.getStatusLine(), body); EntityUtils.consume(httpEntity); } finally { res.close(); } return responseBean; } /** * 帶參數的get請求 * @param url * @param requsetParams * @return * @throws IOException * @throws URISyntaxException */ public static HttpResponseBean get(String url, Map<String, String> requsetParams) throws IOException { HttpResponseBean responseBean = null; HttpGet httpGet; try { URIBuilder uriBuilder = new URIBuilder(url); if(!GUtilCommonUtil.checkListEmpty(requsetParams)) { List<NameValuePair> nvps = new ArrayList<NameValuePair>(); requsetParams.forEach((k,v) -> { nvps.add(new BasicNameValuePair(k, v)); }); uriBuilder.setParameters(nvps); } httpGet = new HttpGet(uriBuilder.build()); } catch (Exception e) { throw new IOException(e); } CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet); try { HttpEntity httpEntity = res.getEntity(); String body = EntityUtils.toString(httpEntity); responseBean = new HttpResponseBean(res.getStatusLine(), body); EntityUtils.consume(httpEntity); } finally { res.close(); } return responseBean; } /** * post請求 * @param url * @param requsetParams * @return * @throws IOException */ public static HttpResponseBean post(String url, Map<String, String> requsetParams) throws IOException { HttpResponseBean responseBean = null; HttpPost httpPost = new HttpPost(url); if(!GUtilCommonUtil.checkListEmpty(requsetParams)) { List<NameValuePair> nvps = new ArrayList<NameValuePair>(); requsetParams.forEach((k,v) -> { nvps.add(new BasicNameValuePair(k, v)); }); httpPost.setEntity(new UrlEncodedFormEntity(nvps)); } CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost); try { HttpEntity httpEntity = response.getEntity(); String body = EntityUtils.toString(httpEntity); responseBean = new HttpResponseBean(response.getStatusLine(), body); EntityUtils.consume(httpEntity); } finally { response.close(); } return responseBean; } }
封裝了get請求和post請求,封裝了響應結果。
加了一個鉤子,在jvm關閉時能夠主動關閉創建的資源。
2、訂閱消息、生產消息
這兩部分主要就是調用上面的HttpUtil,然後將結果包裝一下。
具體代碼請參考前文的git。
3、實例管理
為了使得用戶不需要關心具體實現,所以建了實例管理類。
package com.shuimutong.gmq.client.util; import com.shuimutong.gmq.client.cache.CommonObjCache; import com.shuimutong.gmq.client.cache.impl.CommonObjCacheImpl; import com.shuimutong.gmq.client.consumer.GmqConsumer; import com.shuimutong.gmq.client.producer.GmqProducer; public class GmqInstanceManage { public static GmqProducer getGmqProducer(String gmqServerUrl) { return new GmqProducer(gmqServerUrl); } public static GmqConsumer getGmqConsumer(String gmqServerUrl) { return new GmqConsumer(gmqServerUrl); } public static CommonObjCache getCommonCache(String serverUrl) { return new CommonObjCacheImpl(serverUrl); } }
主要是為了封裝變化。因為之後再迭代的話,實例的具體實現肯定不是目前這麼簡單,所以要儘量讓使用者少關心具體實現。
使用時關心的越多,後續項目迭代肯定越困難。
三、使用示例
1、生產消息
@Test public void produceMsg() { GmqProducer producer = GmqInstanceManage.getGmqProducer(gmqServerUrl); for(int i=0; i<5; i++) { String message = "message:" + i; try { SendMqResult res = producer.sendMq(topic, message); System.out.println(res.getRes()); } catch (SendMqException e) { e.printStackTrace(); } } }
2、消費消息
主要思路是:消費消息之前,先查詢當前已經消費到了哪條消息。消息消費之後,將消費的編號存入緩存。
典型的主動拉消息,消息是否消費由自己負責的模式。
實現如下:
@Test public void comsumerMsgByCache() { GmqConsumer comsumer = GmqInstanceManage.getGmqConsumer(gmqServerUrl); CommonObjCache commonCache = GmqInstanceManage.getCommonCache(gmqServerUrl); String gmqSign = "gmq_consumer_id"; long consumerId = 0; int size = 2; for(int i=0; i<5; i++) { try { CacheObj cacheId = commonCache.getById(gmqSign); if(cacheId != null) { consumerId = Long.parseLong(cacheId.getContent()); } List<MqContent> res = comsumer.getMq(topic, consumerId, size); for(MqContent mq : res) { System.out.println(JSONObject.toJSONString(mq)); if(mq.getId() > consumerId) { consumerId = mq.getId(); } } commonCache.save(gmqSign, String.valueOf(consumerId)); System.out.println("保存consumerId:" + consumerId); } catch (Exception e) { e.printStackTrace(); } } }
四、總結
gmq的初版至今已經完成,當然這隻是開始。
後續計劃先將gmvc框架替換掉,直接使用netty進行通信。
然後把消息存到資料庫改為存到磁碟上。
然後就是服務的高可用改造。
屆時歡迎指導。
第2版設計、開發中……