在RocketMQ中,使用BrokerStartup作為啟動類,相較於NameServer的啟動,Broker作為RocketMQ的核心可複雜得多 【RocketMQ中NameServer的啟動源碼分析】 主函數作為其啟動的入口: 首先通過createBrokerController方法生成Brok ...
在RocketMQ中,使用BrokerStartup作為啟動類,相較於NameServer的啟動,Broker作為RocketMQ的核心可複雜得多
主函數作為其啟動的入口:
1 public static void main(String[] args) { 2 start(createBrokerController(args)); 3 }
首先通過createBrokerController方法生成Broker的控制器BrokerController
createBrokerController方法:
1 public static BrokerController createBrokerController(String[] args) { 2 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); 3 4 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) { 5 NettySystemConfig.socketSndbufSize = 131072; 6 } 7 8 if (null == System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) { 9 NettySystemConfig.socketRcvbufSize = 131072; 10 } 11 12 try { 13 //PackageConflictDetect.detectFastjson(); 14 Options options = ServerUtil.buildCommandlineOptions(new Options()); 15 commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options), 16 new PosixParser()); 17 if (null == commandLine) { 18 System.exit(-1); 19 } 20 21 final BrokerConfig brokerConfig = new BrokerConfig(); 22 final NettyServerConfig nettyServerConfig = new NettyServerConfig(); 23 final NettyClientConfig nettyClientConfig = new NettyClientConfig(); 24 25 nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE, 26 String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING)))); 27 nettyServerConfig.setListenPort(10911); 28 final MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); 29 30 if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) { 31 int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10; 32 messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio); 33 } 34 35 if (commandLine.hasOption('c')) { 36 String file = commandLine.getOptionValue('c'); 37 if (file != null) { 38 configFile = file; 39 InputStream in = new BufferedInputStream(new FileInputStream(file)); 40 properties = new Properties(); 41 properties.load(in); 42 43 properties2SystemEnv(properties); 44 MixAll.properties2Object(properties, brokerConfig); 45 MixAll.properties2Object(properties, nettyServerConfig); 46 MixAll.properties2Object(properties, nettyClientConfig); 47 MixAll.properties2Object(properties, messageStoreConfig); 48 49 BrokerPathConfigHelper.setBrokerConfigPath(file); 50 in.close(); 51 } 52 } 53 54 MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig); 55 56 if (null == brokerConfig.getRocketmqHome()) { 57 System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation", MixAll.ROCKETMQ_HOME_ENV); 58 System.exit(-2); 59 } 60 61 String namesrvAddr = brokerConfig.getNamesrvAddr(); 62 if (null != namesrvAddr) { 63 try { 64 String[] addrArray = namesrvAddr.split(";"); 65 for (String addr : addrArray) { 66 RemotingUtil.string2SocketAddress(addr); 67 } 68 } catch (Exception e) { 69 System.out.printf( 70 "The Name Server Address[%s] illegal, please set it as follows, \"127.0.0.1:9876;192.168.0.1:9876\"%n", 71 namesrvAddr); 72 System.exit(-3); 73 } 74 } 75 76 switch (messageStoreConfig.getBrokerRole()) { 77 case ASYNC_MASTER: 78 case SYNC_MASTER: 79 brokerConfig.setBrokerId(MixAll.MASTER_ID); 80 break; 81 case SLAVE: 82 if (brokerConfig.getBrokerId() <= 0) { 83 System.out.printf("Slave's brokerId must be > 0"); 84 System.exit(-3); 85 } 86 87 break; 88 default: 89 break; 90 } 91 92 messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1); 93 LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); 94 JoranConfigurator configurator = new JoranConfigurator(); 95 configurator.setContext(lc); 96 lc.reset(); 97 configurator.doConfigure(brokerConfig.getRocketmqHome() + "/conf/logback_broker.xml"); 98 99 if (commandLine.hasOption('p')) { 100 InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); 101 MixAll.printObjectProperties(console, brokerConfig); 102 MixAll.printObjectProperties(console, nettyServerConfig); 103 MixAll.printObjectProperties(console, nettyClientConfig); 104 MixAll.printObjectProperties(console, messageStoreConfig); 105 System.exit(0); 106 } else if (commandLine.hasOption('m')) { 107 InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME); 108 MixAll.printObjectProperties(console, brokerConfig, true); 109 MixAll.printObjectProperties(console, nettyServerConfig, true); 110 MixAll.printObjectProperties(console, nettyClientConfig, true); 111 MixAll.printObjectProperties(console, messageStoreConfig, true); 112 System.exit(0); 113 } 114 115 log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); 116 MixAll.printObjectProperties(log, brokerConfig); 117 MixAll.printObjectProperties(log, nettyServerConfig); 118 MixAll.printObjectProperties(log, nettyClientConfig); 119 MixAll.printObjectProperties(log, messageStoreConfig); 120 121 final BrokerController controller = new BrokerController( 122 brokerConfig, 123 nettyServerConfig, 124 nettyClientConfig, 125 messageStoreConfig); 126 // remember all configs to prevent discard 127 controller.getConfiguration().registerConfig(properties); 128 129 boolean initResult = controller.initialize(); 130 if (!initResult) { 131 controller.shutdown(); 132 System.exit(-3); 133 } 134 135 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 136 private volatile boolean hasShutdown = false; 137 private AtomicInteger shutdownTimes = new AtomicInteger(0); 138 139 @Override 140 public void run() { 141 synchronized (this) { 142 log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); 143 if (!this.hasShutdown) { 144 this.hasShutdown = true; 145 long beginTime = System.currentTimeMillis(); 146 controller.shutdown(); 147 long consumingTimeTotal = System.currentTimeMillis() - beginTime; 148 log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); 149 } 150 } 151 } 152 }, "ShutdownHook")); 153 154 return controller; 155 } catch (Throwable e) { 156 e.printStackTrace(); 157 System.exit(-1); 158 } 159 160 return null; 161 }
這裡和NameServer中的createNamesrvController方法作用類似,對Broker所需做了一系列的配置
先設置了Netty通信時的緩衝區大小,這裡預設是128K
接著會創建了幾個實體類
BrokerConfig,用來封裝其絕大多數基本配置信息
NettyServerConfig,封裝了其作為對外暴露的消息隊列伺服器的信息
NettyClientConfig,則封裝了其作為NameServer客戶端的信息
這裡面封裝的信息和NameServer一個道理,都是映射了配置文件相應的配置
然後對NettyClientConfig的TLS進行設置
讓NettyServerConfig預設監聽10911埠
緊接著創建了一個MessageStoreConfig,這個就是用來封裝Store的信息,
MessageStoreConfig會預設配置BrokerRole為ASYNC_MASTER
Broker有三種身份,用BrokerRole枚舉來表示:
1 public enum BrokerRole { 2 ASYNC_MASTER, 3 SYNC_MASTER, 4 SLAVE; 5 }
也就是非同步Master,同步Master,以及Slave
這裡會對其身份進行檢查,若是Slave,則需要調整其允許的消息最大記憶體占比,預設值是40,也就是說Master允許消息最大記憶體占用40%,而Slave則只允許30%
接著會對”-c“指令進行相應配置的載入
往後看到對namesrvAddr進行了檢查,只是簡單地檢查NameServer集群地址信息是否合法
往下看到有個switch塊,其根據Broker的身份,進行設置
只要是Master,將其BrokerId設為0,而Slave的BrokerId需要大於0
(一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關係通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,大於0表示Slave,Master也可以部署多個)
繼續往下,這裡會對Store設置HA的監聽埠,是NettyServer偵聽埠加1
往下是對“-p”,”-m“指令進行相應配置的載入,以及日誌的相關配置
之後就會創建了一個BrokerController:
1 public BrokerController( 2 final BrokerConfig brokerConfig, 3 final NettyServerConfig nettyServerConfig, 4 final NettyClientConfig nettyClientConfig, 5 final MessageStoreConfig messageStoreConfig 6 ) { 7 this.brokerConfig = brokerConfig; 8 this.nettyServerConfig = nettyServerConfig; 9 this.nettyClientConfig = nettyClientConfig; 10 this.messageStoreConfig = messageStoreConfig; 11 this.consumerOffsetManager = new ConsumerOffsetManager(this); 12 this.topicConfigManager = new TopicConfigManager(this); 13 this.pullMessageProcessor = new PullMessageProcessor(this); 14 this.pullRequestHoldService = new PullRequestHoldService(this); 15 this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService); 16 this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this); 17 this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener); 18 this.consumerFilterManager = new ConsumerFilterManager(this); 19 this.producerManager = new ProducerManager(); 20 this.clientHousekeepingService = new ClientHousekeepingService(this); 21 this.broker2Client = new Broker2Client(this); 22 this.subscriptionGroupManager = new SubscriptionGroupManager(this); 23 this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig); 24 this.filterServerManager = new FilterServerManager(this); 25 26 this.slaveSynchronize = new SlaveSynchronize(this); 27 28 this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()); 29 this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity()); 30 this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity()); 31 this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); 32 this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); 33 this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); 34 this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity()); 35 36 this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); 37 this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); 38 39 this.brokerFastFailure = new BrokerFastFailure(this); 40 this.configuration = new Configuration( 41 log, 42 BrokerPathConfigHelper.getBrokerConfigPath(), 43 this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig 44 ); 45 }
可以看到,這裡實例化了許多成員,我就不一一分析,挑幾個重要的介紹
ConsumerOffsetManager:用來管理消費者的消費消息的進度,主要通過一張map來緩存
1 private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable = 2 new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
由topic@group的形式構成鍵,而值中的map的Integer代表具體的哪條消息隊列,Long表示該消息隊列的偏移量offset
TopicConfigManager:管理Topic和消息隊列的信息,主要通過一張map來緩存
1 private final ConcurrentMap<String, TopicConfig> topicConfigTable = 2 new ConcurrentHashMap<String, TopicConfig>(1024); 3 private final DataVersion dataVersion = new DataVersion();
鍵就是Topic,值TopicConfig用來記錄對應的消息隊列的個數
PullMessageProcessor、PullRequestHoldService、NotifyMessageArrivingListener這三個來管理Pull消息請求,關於Pull消息在後續博客再細說
ConsumerManager:管理Consumer,主要通過一張map來緩存
1 private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable = 2 new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
鍵值就是Consumer的GroupName,
而ConsumerGroupInfo由如下構成:
1 private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = 2 new ConcurrentHashMap<String, SubscriptionData>(); 3 private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = 4 new ConcurrentHashMap<Channel, ClientChannelInfo>(16); 5 private volatile ConsumeType consumeType; 6 private volatile MessageModel messageModel;
可以看到封裝了一個subscriptionTable ,這個map記錄Topic和訂閱內容
以及一個channelInfoTable,記錄Consumer的物理連接
ConsumeType是一個枚舉,表明兩種消費方式:
1 public enum ConsumeType { 2 CONSUME_ACTIVELY("PULL"), 3 CONSUME_PASSIVELY("PUSH"); 4 }
MessageModel 也是一個枚舉,表明兩種消費模式:
1 public enum MessageModel { 2 /** 3 * broadcast 4 */ 5 BROADCASTING("BROADCASTING"), 6 /** 7 * clustering 8 */ 9 CLUSTERING("CLUSTERING"); 10 }
Broadcasting:同一個ConsumerGroup里的每個Consumer都能消費到所訂閱Topic的全部消息,也就是一個消息會被多次分發,被多個Consumer消費
Clustering:同一個ConsumerGroup里的每個Consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消費的內容合起來才是所訂閱Topic內容的整體,從而達到負載均衡的目的
結合著來看,也就是說使用相同GroupName的一組Consumer,其ConsumeType和MessageModel必定相同,其訂閱的Topic會根據ConsumeType和MessageModel來完成相應的方式的消息處理
回到BrokerController的構造
ProducerManager:管理Producer,主要通過一張map來緩存
1 private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable = 2 new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
相比ConsumerManager,對Producer的管理簡單的多,只需要記錄group name 和物理連接的映射
再回到createBrokerController方法,在完成BrokerController的創建後,會調用BrokerController的initialize方法:
BrokerController的initialize方法:
1 public boolean initialize() throws CloneNotSupportedException { 2 boolean result = this.topicConfigManager.load(); 3 4 result = result && this.consumerOffsetManager.load(); 5 result = result && this.subscriptionGroupManager.load(); 6 result = result && this.consumerFilterManager.load(); 7 8 if (result) { 9 try { 10 this.messageStore = 11 new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener, 12 this.brokerConfig); 13 if (messageStoreConfig.isEnableDLegerCommitLog()) { 14 DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); 15 ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); 16 } 17 this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore); 18 //load plugin 19 MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig); 20 this.messageStore = MessageStoreFactory.build(context, this.messageStore); 21 this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager)); 22 } catch (IOException e) { 23 result = false; 24 log.error("Failed to initialize", e); 25 } 26 } 27 28 result = result && this.messageStore.load(); 29 30 if (result) { 31 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService); 32 NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone(); 33 fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2); 34 this.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService); 35 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( 36 this.brokerConfig.getSendMessageThreadPoolNums(), 37 this.brokerConfig.getSendMessageThreadPoolNums(), 38 1000 * 60, 39 TimeUnit.MILLISECONDS, 40 this.sendThreadPoolQueue, 41 new ThreadFactoryImpl("SendMessageThread_")); 42 43 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( 44 this.brokerConfig.getPullMessageThreadPoolNums(), 45 this.brokerConfig.getPullMessageThreadPoolNums(), 46 1000 * 60, 47 TimeUnit.MILLISECONDS, 48 this.pullThreadPoolQueue, 49 new ThreadFactoryImpl("PullMessageThread_")); 50 51 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( 52 this.brokerConfig.getQueryMessageThreadPoolNums(), 53 this.brokerConfig.getQueryMessageThreadPoolNums(), 54 1000 * 60, 55 TimeUnit.MILLISECONDS, 56 this.queryThreadPoolQueue, 57 new ThreadFactoryImpl("QueryMessageThread_")); 58 59 this.adminBrokerExecutor = 60 Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl( 61 "AdminBrokerThread_")); 62 63 this.clientManageExecutor = new ThreadPoolExecutor( 64 this.brokerConfig.getClientManageThreadPoolNums(), 65 this.brokerConfig.getClientManageThreadPoolNums(), 66 1000 * 60, 67 TimeUnit.MILLISECONDS, 68 this.clientManagerThreadPoolQueue, 69 new ThreadFactoryImpl("ClientManageThread_")); 70 71 this.heartbeatExecutor = new BrokerFixedThreadPoolExecutor( 72 this.brokerConfig.getHeartbeatThreadPoolNums(), 73 this.brokerConfig.getHeartbeatThreadPoolNums(), 74 1000 * 60, 75 TimeUnit.MILLISECONDS, 76 this.heartbeatThreadPoolQueue, 77 new ThreadFactoryImpl("HeartbeatThread_", true)); 78 79 this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor( 80 this.brokerConfig.getEndTransactionThreadPoolNums(), 81 this.brokerConfig.getEndTransactionThreadPoolNums(), 82 1000 * 60, 83 TimeUnit.MILLISECONDS, 84 this.endTransactionThreadPoolQueue, 85 new ThreadFactoryImpl("EndTransactionThread_")); 86 87 this.consumerManageExecutor = 88 Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( 89 "ConsumerManageThread_")); 90 91 this.registerProcessor(); 92 93 final long initialDelay = UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis(); 94 final long period = 1000 * 60 * 60 * 24; 95 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 96 @Override 97 public void run() { 98 try { 99 BrokerController.this.getBrokerStats().record(); 100 } catch (Throwable e) { 101 log.error("schedule record error.", e); 102 } 103 } 104 }, initialDelay, period, TimeUnit.MILLISECONDS); 105 106 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 107 @Override 108 public void run() { 109 try { 110 BrokerController.this.consumerOffsetManager.persist(); 111 } catch (Throwable e) { 112 log.error("schedule persist consumerOffset error.", e); 113 } 114 } 115 }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 116 117 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 118 @Override 119 public void run() { 120 try { 121 BrokerController.this.consumerFilterManager.persist(); 122 } catch (Throwable e) { 123 log.error("schedule persist consumer filter error.", e); 124 } 125 } 126 }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); 127 128 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 129 @Override 130 public void run() { 131 try { 132 BrokerController.this.protectBroker(); 133 } catch (Throwable e) { 134 log.error("protectBroker error.", e); 135 } 136 } 137 }, 3, 3, TimeUnit.MINUTES); 138 139 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 140 @Override 141 public void run() { 142 try { 143 BrokerController.this.printWaterMark(); 144 } catch (Throwable e) { 145 log.error("printWaterMark error.", e); 146 } 147 } 148 }, 10, 1, TimeUnit.SECONDS); 149 150 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { 151 152 @Override 153 public void run() { 154 try { 155 log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); 156 }