本文中我們將講解一下App的長連接實現。一般而言長連接已經是App的標配了,推送功能的實現基礎就是長連接,當然了我們也可以通過輪訓操作實現推送功能,但是輪訓一般及時性比較差,而且網路消耗與電量銷毀比較多,因此一般推送功能都是通過長連接實現的。 那麼如何實現長連接呢?現在一般有這麼幾種實現方式: 使用 ...
本文中我們將講解一下App的長連接實現。一般而言長連接已經是App的標配了,推送功能的實現基礎就是長連接,當然了我們也可以通過輪訓操作實現推送功能,但是輪訓一般及時性比較差,而且網路消耗與電量銷毀比較多,因此一般推送功能都是通過長連接實現的。
那麼如何實現長連接呢?現在一般有這麼幾種實現方式:
-
使用第三方的長連接服務;
-
通過NIO等方案實現長連接服務;
-
通過MINA等第三方框架實現長連接;
1. 使用第三方的長連接服務
介紹:這是最簡單的方式,我們可以通過接入極光推送,百度推送,友盟等第三方服務實現長連接,通過接入第三方的API我們可以很方便的接入第三方的長連接,推送服務,但是這種方式定製化程度不太好,如果對長連接服務不是要求特別高,對定製化要求不是很高的話基本可以考慮這種方式(目前主流的App都是使用第三方的長連接服務)
優勢:簡單,方便
劣勢:定製化程度不高
2. 使用NIO等方案實現長連接服務
介紹:通過NIO的方式實現長連接,這種方式對技術要求程度比較高,基本都是通過java API實現長連接,實現心跳包,實現異常情況的容錯等操作,可以說通過NIO實現長連接對技術要求很高,一般如果沒有成行的技術方案比建議這麼做,就算實現了長連接,後期連接的維護,對電量,流量的損耗等都需要持續的優化。
優勢:定製化比較高
劣勢:技術要求高,需要持續的維護
3. 使用MINA等第三方框架實現長連接
介紹:MINA是一個第三方的NIO框架,該框架實現了一整套的長連接機制,包括長連接的建立,心跳包的實現,異常機制的容錯等。使用MINA實現長連接可以定製化的實現一些特有的功能,並且比NIO方案較為簡單,因為其已經封裝了一些長連接的特有機制,比如心跳包,容錯等。
優勢:可定製,較NIO方法簡單
劣勢:也需要一定的技術儲備
長連接具體實現
在我們的Android客戶端中長連接的實現機制採用–MINA方式。這裡多說一句,一開始的長連接採用的是NIO方案,但是採用這種方案之後踩了很多坑,包括心跳,容錯等機制都是自己寫的,所以耗費了大量的時間,而且對手機電量的消耗很大,最後決定使用MINA NIO框架重新實現一遍長連接,後來經過實測,長連接的穩定性還有耗電量,流量的消耗等指標方面有了很大的提高。
下麵我將簡單的介紹一下通過NIO實現長連接的具體流程:
-
引入MINA jar包,在App啟動頁面,登錄頁面啟動長連接;
-
創建後臺服務,在服務中創建MINA長連接;
-
實現心跳包,重寫一些容錯機制;
-
實現長連接斷了之後的重連機制,並且重連次數有限制不能一直重連;
-
長連接斷了之後實現輪訓操作,這裡的輪訓服務只有在長連接斷了之後才啟動,在長連接恢復之後關閉;
以下就是在長連接中實現的具體代碼:
在Application的onCreate方法中檢測App是否登錄,若登錄的話啟動長連接
/** * 在Application的onCreate方法中執行啟動長連接的操作 **/ @Override public void onCreate() { ... // 登錄後開啟長連接 if (UserConfig.isPassLogined()) { L.i("用戶已登錄,開啟長連接..."); startLongConn(); } ... }
通過鬧鐘服務實現具體的啟動長連接service的操作,即每隔60秒鐘判斷長連接是否啟動,若未啟動則實現啟動操作
/** * 開始執行啟動長連接服務 */ public void startLongConn() { quitLongConn(); L.i("長連接服務已開啟"); AlarmManager manager = (AlarmManager) getSystemService(Context.ALARM_SERVICE); Intent intent = new Intent(this, LongConnService.class); intent.setAction(LongConnService.ACTION); PendingIntent pendingIntent = PendingIntent.getService(this, 0, intent, PendingIntent.FLAG_UPDATE_CURRENT); long triggerAtTime = SystemClock.elapsedRealtime(); manager.setRepeating(AlarmManager.RTC_WAKEUP, triggerAtTime, 60 * 1000, pendingIntent); }
下麵的代碼就是長連接服務的具體實現
1 /** 2 * 後臺長連接服務 3 **/ 4 public class LongConnService extends Service { 5 public static String ACTION = "com.youyou.uuelectric.renter.Service.LongConnService"; 6 private static MinaLongConnectManager minaLongConnectManager; 7 public String tag = "LongConnService"; 8 private Context context; 9 10 @Override 11 public int onStartCommand(Intent intent, int flags, int startId) { 12 context = getApplicationContext(); 13 // 執行啟動長連接的操作 14 startLongConnect(); 15 ObserverManager.addObserver("LongConnService", stopListener); 16 return START_STICKY; 17 } 18 19 public ObserverListener stopListener = new ObserverListener() { 20 @Override 21 public void observer(String from, Object obj) { 22 closeConnect(); 23 } 24 }; 25 26 @Override 27 public void onDestroy() { 28 super.onDestroy(); 29 closeConnect(); 30 } 31 32 /** 33 * 開始執行啟動長連接的操作 34 */ 35 private void startLongConnect() { 36 if (Config.isNetworkConnected(context)) { 37 if (minaLongConnectManager != null && minaLongConnectManager.checkConnectStatus()) { 38 L.i("長連接狀態正常..."); 39 return; 40 } 41 if (minaLongConnectManager == null) { 42 startThreadCreateConnect(); 43 } else { 44 if (minaLongConnectManager.connectIsNull() && minaLongConnectManager.isNeedRestart()) { 45 L.i("session已關閉,需要重新創建一個session"); 46 minaLongConnectManager.startConnect(); 47 } else { 48 L.i("長連接已關閉,需要重開一個線程來重新創建長連接"); 49 startThreadCreateConnect(); 50 } 51 } 52 } 53 54 } 55 56 private final AtomicInteger mCount = new AtomicInteger(1); 57 58 private void startThreadCreateConnect() { 59 if (UserConfig.getUserInfo().getB3Key() != null && UserConfig.getUserInfo().getSessionKey() != null) { 60 System.gc(); 61 62 new Thread(new Runnable() { 63 @Override 64 public void run() { 65 // 執行具體啟動長連接操作 66 minaLongConnectManager = MinaLongConnectManager.getInstance(context); 67 minaLongConnectManager.crateLongConnect(); 68 } 69 }, "longConnectThread" + mCount.getAndIncrement()).start(); 70 } 71 } 72 73 74 private void closeConnect() { 75 76 if (minaLongConnectManager != null) { 77 minaLongConnectManager.closeConnect(); 78 } 79 minaLongConnectManager = null; 80 81 // 停止長連接服務LongConnService 82 stopSelf(); 83 } 84 85 @Override 86 public IBinder onBind(Intent intent) { 87 throw new UnsupportedOperationException("Not yet implemented"); 88 } 89 }
而下麵的代碼就是長連接的具體實現操作,具體的代碼有相關註釋說明
1 /** 2 * 具體實現長連接的管理對象 3 **/ 4 public class MinaLongConnectManager { 5 6 private static final String TAG = MinaLongConnectManager.class.getSimpleName(); 7 /** 8 * 伺服器埠號 9 */ 10 public static final int DEFAULT_PORT = 18156; 11 /** 12 * 連接超時時間,30 seconds 13 */ 14 public static final long SOCKET_CONNECT_TIMEOUT = 30 * 1000L; 15 16 /** 17 * 長連接心跳包發送頻率,60s 18 */ 19 public static final int KEEP_ALIVE_TIME_INTERVAL = 60; 20 private static Context context; 21 private static MinaLongConnectManager minaLongConnectManager; 22 23 private static NioSocketConnector connector; 24 private static ConnectFuture connectFuture; 25 public static IoSession session; 26 private static ExecutorService executorService = Executors.newSingleThreadExecutor(); 27 28 /** 29 * 長連接是否正在連接中... 30 */ 31 private static boolean isConnecting = false; 32 33 private MinaLongConnectManager() { 34 EventBus.getDefault().register(this); 35 } 36 37 public static synchronized MinaLongConnectManager getInstance(Context ctx) { 38 39 if (minaLongConnectManager == null) { 40 context = ctx; 41 minaLongConnectManager = new MinaLongConnectManager(); 42 } 43 return minaLongConnectManager; 44 } 45 46 /** 47 * 檢查長連接的各種對象狀態是否正常,正常情況下無需再創建 48 * 49 * @return 50 */ 51 public boolean checkConnectStatus() { 52 if (connector != null && connector.isActive() && connectFuture != null && connectFuture.isConnected() && session != null && session.isConnected()) { 53 return true; 54 } else { 55 return false; 56 } 57 } 58 59 public boolean connectIsNull() { 60 return connector != null; 61 } 62 63 /** 64 * 創建長連接,配置過濾器鏈和心跳工廠 65 */ 66 public synchronized void crateLongConnect() { 67 // 如果是長連接正在創建中 68 if (isConnecting) { 69 L.i("長連接正在創建中..."); 70 return; 71 } 72 if (!Config.isNetworkConnected(context)) { 73 L.i("檢測到網路未打開,無法正常啟動長連接,直接return..."); 74 return; 75 } 76 // 檢查長連接的各種對象狀態是否正常,正常情況下無需再創建 77 if (checkConnectStatus()) { 78 return; 79 } 80 isConnecting = true; 81 try { 82 connector = new NioSocketConnector(); 83 connector.setConnectTimeoutMillis(SOCKET_CONNECT_TIMEOUT); 84 85 if (L.isDebug) { 86 if (!connector.getFilterChain().contains("logger")) { 87 // 設置日誌輸出工廠 88 connector.getFilterChain().addLast("logger", new LoggingFilter()); 89 } 90 } 91 if (!connector.getFilterChain().contains("codec")) { 92 // 設置請求和響應對象的編解碼操作 93 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new LongConnectProtocolFactory())); 94 } 95 // 創建心跳工廠 96 ClientKeepAliveMessageFactory heartBeatFactory = new ClientKeepAliveMessageFactory(); 97 // 當讀操作空閑時發送心跳 98 KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory, IdleStatus.READER_IDLE); 99 // 設置是否將事件繼續往下傳遞 100 heartBeat.setForwardEvent(true); 101 // 設置心跳包請求後超時無反饋情況下的處理機制,預設為關閉連接,在此處設置為輸出日誌提醒 102 heartBeat.setRequestTimeoutHandler(KeepAliveRequestTimeoutHandler.LOG); 103 //設置心跳頻率 104 heartBeat.setRequestInterval(KEEP_ALIVE_TIME_INTERVAL); 105 if (!connector.getFilterChain().contains("keepAlive")) { 106 connector.getFilterChain().addLast("keepAlive", heartBeat); 107 } 108 if (!connector.getFilterChain().contains("reconnect")) { 109 // 設置長連接重連過濾器,當檢測到Session(會話)斷開後,重連長連接 110 connector.getFilterChain().addLast("reconnect", new LongConnectReconnectionFilter()); 111 } 112 // 設置接收和發送緩衝區大小 113 connector.getSessionConfig().setReceiveBufferSize(1024); 114 connector.getSessionConfig().setSendBufferSize(1024); 115 // 設置讀取空閑時間:單位為s 116 connector.getSessionConfig().setReaderIdleTime(60); 117 118 // 設置長連接業務邏輯處理類Handler 119 LongConnectHandler longConnectHandler = new LongConnectHandler(this, context); 120 connector.setHandler(longConnectHandler); 121 122 } catch (Exception e) { 123 e.printStackTrace(); 124 closeConnect(); 125 } 126 127 startConnect(); 128 } 129 130 /** 131 * 開始或重連長連接 132 */ 133 public synchronized void startConnect() { 134 if (connector != null) { 135 L.i("開始創建長連接..."); 136 boolean isSuccess = beginConnect(); 137 // 創建成功後,修改創建中狀態 138 if (isSuccess) { 139 isNeedRestart = false; 140 if (context != null) { 141 // 長連接啟動成功後,主動拉取一次消息 142 LoopRequest.getInstance(context).sendLoopRequest(); 143 } 144 } else { 145 // 啟動輪詢服務 146 startLoopService(); 147 } 148 isConnecting = false; 149 // printProcessorExecutor(); 150 } else { 151 L.i("connector已為null,不能執行創建連接動作..."); 152 } 153 } 154 155 /** 156 * 檢測MINA中線程池的活動狀態 157 */ 158 private void printProcessorExecutor() { 159 Class connectorClass = connector.getClass().getSuperclass(); 160 try { 161 L.i("connectorClass:" + connectorClass.getCanonicalName()); 162 Field field = connectorClass.getDeclaredField("processor"); 163 field.setAccessible(true); 164 Object connectorObject = field.get(connector); 165 if (connectorObject != null) { 166 SimpleIoProcessorPool processorPool = (SimpleIoProcessorPool) connectorObject; 167 Class processPoolClass = processorPool.getClass(); 168 Field executorField = processPoolClass.getDeclaredField("executor"); 169 executorField.setAccessible(true); 170 Object executorObject = executorField.get(processorPool); 171 if (executorObject != null) { 172 ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorObject; 173 L.i("線程池中當前線程數:" + threadPoolExecutor.getPoolSize() + "\t 核心線程數:" + threadPoolExecutor.getCorePoolSize() + "\t 最大線程數:" + threadPoolExecutor.getMaximumPoolSize()); 174 } 175 176 } else { 177 L.i("connectorObject = null"); 178 } 179 } catch (Exception e) { 180 e.printStackTrace(); 181 } 182 } 183 184 185 /** 186 * 開始創建Session 187 * 188 * @return 189 */ 190 public boolean beginConnect() { 191 192 if (session != null) { 193 session.close(false); 194 session = null; 195 } 196 if (connectFuture != null && connectFuture.isConnected()) { 197 connectFuture.cancel(); 198 connectFuture = null; 199 } 200 FutureTask<Boolean> futureTask = new FutureTask<>(new Callable<Boolean>() { 201 @Override 202 public Boolean call() { 203 try { 204 InetSocketAddress address = new InetSocketAddress(NetworkTask.getBASEURL(), DEFAULT_PORT); 205 connectFuture = connector.connect(address); 206 connectFuture.awaitUninterruptibly(3000L); 207 session = connectFuture.getSession(); 208 if (session == null) { 209 L.i(TAG + "連接創建失敗...當前環境:" + NetworkTask.getBASEURL()); 210 return false; 211 } else { 212 L.i(TAG + "長連接已啟動,連接已成功...當前環境:" + NetworkTask.getBASEURL()); 213 return true; 214 } 215 } catch (Exception e) { 216 return false; 217 } 218 } 219 }); 220 221 executorService.submit(futureTask); 222 try { 223 return futureTask.get(); 224 } catch (Exception e) { 225 return false; 226 } 227 228 } 229 230 /** 231 * 關閉連接,根據傳入的參數設置session是否需要重新連接 232 */ 233 public synchronized void closeConnect() { 234 if (session != null) { 235 session.close(false); 236 session = null; 237 } 238 if (connectFuture != null && connectFuture.isConnected()) { 239 connectFuture.cancel(); 240 connectFuture = null; 241 } 242 if (connector != null && !connector.isDisposed()) { 243 // 清空裡面註冊的所以過濾器 244 connector.getFilterChain().clear(); 245 connector.dispose(); 246 connector = null; 247 } 248 isConnecting = false; 249 L.i("長連接已關閉..."); 250 } 251 252 private volatile boolean isNeedRestart = false; 253 254 public boolean isNeedRestart() { 255 return isNeedRestart; 256 } 257 258 public void onEventMainThread(BaseEvent event) { 259 if (event == null || TextUtils.isEmpty(event.getType())) 260 return; 261 if (EventBusConstant.EVENT_TYPE_NETWORK_STATUS.equals(event.getType())) { 262 String status = (String) event.getExtraData(); 263 // 當網路狀態變化的時候請求startQuery介面 264 if (status != null && status.equals("open")) { 265 if (isNeedRestart && UserConfig.getUserInfo().getB3Key() != null && UserConfig.getUserInfo().getSessionKey() != null) { 266 L.i("檢測到網路已打開且長連接處於關閉狀態,需要啟動長連接..."); 267 Intent intent = new Intent(context, LongConnService.class); 268 intent.setAction(LongConnService.ACTION); 269 context.startService(intent); 270 } 271 } 272 } 273 } 274 275 /** 276 * 出現異常、session關閉後,接收事件進行長連接重連操作 277 */ 278 public void onEventMainThread(LongConnectMessageEvent event) { 279 280 if (event.getType() == LongConnectMessageEvent.TYPE_RESTART) { 281 282 long currentTime = System.currentTimeMillis(); 283 284 // 票據有效的情況下進行重連長連接操作 285 if (UserConfig.getUserInfo().getB3Key() != null && UserConfig.getUserInfo().getSessionKey() != null 286 && ((currentTime / 1000) < UserConfig.getUserInfo().getUnvalidSecs())) { 287 // 等待2s後重新創建長連接 288 SystemClock.sleep(1000); 289 if (Config.isNetworkConnected(context)) { 290 L.i("出現異常情況,需要自動重連長連接..."); 291 startConnect(); 292 } else { 293 isNeedRestart = true; 294 L.i("長連接出現異常,需要重新創建session會話..."); 295 } 296 } 297 } else if (event.getType() == LongConnectMessageEvent.TYPE_CLOSE) { 298 L.i("收到session多次close的消息,此時需要關閉長連接,等待下次鬧鐘服務來啟動..."); 299 closeConnect(); 300 } 301 } 302 303 304 /** 305 * 啟動輪詢服務 306 */ 307 public void startLoopService() { 308 // 啟動輪詢服務 309 // 暫時不考慮加