需求場景 項目中有這麼個需求:統計集群中各個節點的數據量存儲大小,不是記錄數。 一開始有點無頭緒,後面查看cassandra官方文檔看到Monitoring章節,裡面說到:Cassandra中的指標使用Dropwizard Metrics庫進行管理。 這些指標可以通過JMX查詢,也可以使用多個內置和 ...
需求場景
項目中有這麼個需求:統計集群中各個節點的數據量存儲大小,不是記錄數。
一開始有點無頭緒,後面查看cassandra官方文檔看到Monitoring章節,裡面說到:Cassandra中的指標使用Dropwizard Metrics庫進行管理。 這些指標可以通過JMX查詢,也可以使用多個內置和第三方報告插件推送到外部監控系統(Jconsole)。那麼數據量存儲大小是不是也是cassandra的某項指標了? 帶著疑問,我通過Jconsole看到了cassandra的一些指標(先啟動cassandra, 運行 -> Jconsole),如下圖
數據量存儲大小就在叫org.apache.cassandra.db的MBean中,具體會在之後介紹。
JMX定義
引用JMX超詳細解讀中一段話:
JMX(Java Management Extensions)是一個為應用程式植入管理功能的框架。JMX是一套標準的代理和服務,實際上,用戶可以在任何Java應用程式中使用這些代理和服務實現管理。這是官方文檔上的定義,我看過很多次也無法很好的理解。
我個人的理解是JMX讓程式有被管理的功能,例如你開發一個WEB網站,它是在24小時不間斷運行,那麼你肯定會對網站進行監控,如每天的UV、PV是多少;又或者在業務高峰的期間,你想對介面進行限流,就必須去修改介面併發的配置值。
應用場景:中間件軟體WebLogic的管理頁面就是基於JMX開發的,而JBoss則整個系統都基於JMX構架,另外包括cassandra中各項指標的管理。
對於一些參數的修改,網上有一段描述還是比較形象的:
1、程式初哥一般是寫死在程式中,到要改變的時候就去修改代碼,然後重新編譯發佈。
2、程式熟手則配置在文件中(JAVA一般都是properties文件),到要改變的時候只要修改配置文件,但還是必須重啟系統,以便讀取配置文件里最新的值。
3、程式好手則會寫一段代碼,把配置值緩存起來,系統在獲取的時候,先看看配置文件有沒有改動,如有改動則重新從配置里讀取,否則從緩存里讀取。
4、程式高手則懂得物為我所用,用JMX把需要配置的屬性集中在一個類中,然後寫一個MBean,再進行相關配置。另外JMX還提供了一個工具頁,以方便我們對參數值進行修改。
給我的感覺,jmx server進行監聽,jmx client進行請求的發送,以此達到通信的目的;cassandra的jmx server已經被cassandra實現,我們只需要實現jmx client,就能從cassandra進程中拿到我們需要的指標數據。
JMX Server
1、 首先定義一個MBean介面
介面的命名規範為以具體的實現類為首碼(這個規範很重要),動態代理的過程中需要用到這點。
public interface HelloMBean { String getName(); void setName(String name); void print(); }
2、定義一個實現類
實現上面的介面:
public class Hello implements HelloMBean { private String name; @Override public String getName() { return this.name; } @Override public void setName(String name) { this.name = name; } @Override public void print() { System.out.println("hello, print"); } }
3、定義一個jmx server,並啟動它:
public class HelloService { private static final int RMI_PORT = 8099; private static final String JMX_SERVER_NAME = "TestJMXServer"; private static final String USER_NAME = "hello"; private static final String PASS_WORD = "world"; public static void main(String[] args) throws Exception { HelloService service = new HelloService(); service.startJmxServer(); } private void startJmxServer() throws Exception { //MBeanServer mbs = MBeanServerFactory.createMBeanServer(jmxServerName); MBeanServer mbs = this.getMBeanServer(); // 在本地主機上創建並輸出一個註冊實例,來接收特定埠的請求 LocateRegistry.createRegistry(RMI_PORT, null, RMISocketFactory.getDefaultSocketFactory()); JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + RMI_PORT + "/" + JMX_SERVER_NAME); System.out.println("JMXServiceURL: " + url.toString()); Map<String, Object> env = this.putAuthenticator(); //JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs); // 不加認證 JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs); // 加認證 jmxConnServer.start(); } private MBeanServer getMBeanServer() throws Exception { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName objName = new ObjectName(JMX_SERVER_NAME + ":name=" + "hello"); mbs.registerMBean(new Hello(), objName); return mbs; } private Map<String, Object> putAuthenticator() { Map<String,Object> env = new HashMap<String,Object>(); JMXAuthenticator auth = createJMXAuthenticator(); env.put(JMXConnectorServer.AUTHENTICATOR, auth); env.put("com.sun.jndi.rmi.factory.socket", RMISocketFactory.getDefaultSocketFactory()); return env; } private JMXAuthenticator createJMXAuthenticator() { return new JMXAuthenticator() { public Subject authenticate(Object credentials) { String[] sCredentials = (String[]) credentials; if (null == sCredentials || sCredentials.length != 2) { throw new SecurityException("Authentication failed!"); } String userName = sCredentials[0]; String password = sCredentials[1]; if (USER_NAME.equals(userName) && PASS_WORD.equals(password)) { Set<JMXPrincipal> principals = new HashSet<JMXPrincipal>(); principals.add(new JMXPrincipal(userName)); return new Subject(true, principals, Collections.EMPTY_SET, Collections.EMPTY_SET); } throw new SecurityException("Authentication failed!"); } }; } }View Code
點下print按鈕,你會發現控制台會列印:hello, print。
cassandra的jmx server已經自己實現了,我們不需要實現它,我們需要實現的是調用它。
JMX client
這個是我們需要關註和實現的
1、client端介面定義
介面中定義的方法是我們需要調用的,方法名必須與server端暴露的方法一樣,通過server端動態生成client端的實例,實例中的方法只包括client端介面中定義的方法(若server端暴露的是屬性,那麼直接在屬性前加get,後面cassandra部分會講到)
public interface HelloClientMBean { void print(); // 方法定義與server端暴露的方法一致 }
2、連接jmx server
public class HelloClient implements AutoCloseable { private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/TestJMXServer"; private static final String ssObjName = "TestJMXServer:name=hello"; private static final int defaultPort = 1099; // cassandra預設埠是7199 final String host; final int port; private String username; private String password; private JMXConnector jmxc; private MBeanServerConnection mbeanServerConn; private HelloMBean hmProxy; /** * Creates a connection using the specified JMX host, port, username, and password. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public HelloClient(String host, int port, String username, String password) throws IOException { assert username != null && !username.isEmpty() && password != null && !password.isEmpty() : "neither username nor password can be blank"; this.host = host; this.port = port; this.username = username; this.password = password; connect(); } /** * Creates a connection using the specified JMX host and port. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public HelloClient(String host, int port) throws IOException { this.host = host; this.port = port; connect(); } /** * Creates a connection using the specified JMX host and default port. * * @param host hostname or IP address of the JMX agent * @throws IOException on connection failures */ public HelloClient(String host) throws IOException { this.host = host; this.port = defaultPort; connect(); } /** * Create a connection to the JMX agent and setup the M[X]Bean proxies. * * @throws IOException on connection failures */ private void connect() throws IOException { JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port)); Map<String,Object> env = new HashMap<String,Object>(); if (username != null) { String[] creds = { username, password }; env.put(JMXConnector.CREDENTIALS, creds); } env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory()); jmxc = JMXConnectorFactory.connect(jmxUrl, env); mbeanServerConn = jmxc.getMBeanServerConnection(); try { ObjectName name = new ObjectName(ssObjName); hmProxy = JMX.newMBeanProxy(mbeanServerConn, name, HelloMBean.class); } catch (MalformedObjectNameException e) { throw new RuntimeException( "Invalid ObjectName? Please report this as a bug.", e); } } private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException { if (Boolean.parseBoolean(System.getProperty("ssl.enable"))) return new SslRMIClientSocketFactory(); else return RMISocketFactory.getDefaultSocketFactory(); } public void print() { hmProxy.print(); } @Override public void close() throws Exception { jmxc.close(); } }View Code
3、介面調用
public class JmxClient { public static void main(String[] args) throws Exception { HelloClient client = new HelloClient("localhost", 8099, "hello", "world"); client.print(); client.close(); } }
會在控制台列印:hello, print。
統計cassandra集群中各個節點的數據量存儲大小
也分3步:
1、client端介面定義
因為我們只關心數據量存儲大小,所以我們只需要在介面定義一個方法
public interface StorageServiceMBean { /** Human-readable load value. Keys are IP addresses. */ public Map<String, String> getLoadMap(); // cassandra端暴露的是屬性LoadMap,那麼此方法名由get加LoadMap組成, 那麼getLoad方法就可以獲取LoadMap的值
}
2、連接jmx server
cassandra-env.sh配置文件中有cassandra的JMX預設埠:JMX_PORT="7199"
public class CassNodeProbe implements AutoCloseable { private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/jmxrmi"; private static final String ssObjName = "org.apache.cassandra.db:type=StorageService"; private static final int defaultPort = 7199; final String host; final int port; private String username; private String password; private JMXConnector jmxc; private MBeanServerConnection mbeanServerConn; private StorageServiceMBean ssProxy; /** * Creates a NodeProbe using the specified JMX host, port, username, and password. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public CassNodeProbe(String host, int port, String username, String password) throws IOException { assert username != null && !username.isEmpty() && password != null && !password.isEmpty() : "neither username nor password can be blank"; this.host = host; this.port = port; this.username = username; this.password = password; connect(); } /** * Creates a NodeProbe using the specified JMX host and port. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public CassNodeProbe(String host, int port) throws IOException { this.host = host; this.port = port; connect(); } /** * Creates a NodeProbe using the specified JMX host and default port. * * @param host hostname or IP address of the JMX agent * @throws IOException on connection failures */ public CassNodeProbe(String host) throws IOException { this.host = host; this.port = defaultPort; connect(); } /** * Create a connection to the JMX agent and setup the M[X]Bean proxies. * * @throws IOException on connection failures */ private void connect() throws IOException { JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port)); Map<String,Object> env = new HashMap<String,Object>(); if (username != null) { String[] creds = { username, password }; env.put(JMXConnector.CREDENTIALS, creds); } env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory()); jmxc = JMXConnectorFactory.connect(jmxUrl, env); mbeanServerConn = jmxc.getMBeanServerConnection(); try { ObjectName name = new ObjectName(ssObjName); ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); } catch (MalformedObjectNameException e) { throw new RuntimeException( "Invalid ObjectName? Please report this as a bug.", e); } } private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException { if (Boolean.parseBoolean(System.getProperty("ssl.enable"))) return new SslRMIClientSocketFactory(); else return RMISocketFactory.getDefaultSocketFactory(); } public Map<String, String> getCassClusterStorage() { return ssProxy.getLoadMap(); } @Override public void close() throws Exception { jmxc.close(); } }View Code
3、介面調用
public class JMXTest { public static void main(String[] args) throws Exception { CassNodeProbe prode = new CassNodeProbe("127.0.0.1"); Map<String, String> nodeStorages = prode.getCassClusterStorage(); System.out.println(nodeStorages); prode.close(); } }
最後得到結果:{127.0.0.1=266.36 KB}
cassandra的jmx 認證訪問我就不做演示了,大家自己去實現。
cassandra jmx client實現:cassandra-all
cassandra給我們提供了工具jar,jmx server暴露的在這個工具jar中都有對應的請求方式;
如若大家用到的很少則可以自己實現,而不需要用cassandra-all,當然我們可以拷貝cassandra-all中我們需要的代碼到我們的工程中,那麼我們就可以不用引用此jar,但是又滿足了我們的需求
<dependency> <groupId>org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> <version>2.1.14</version> </dependency>
參考:
http://www.cnblogs.com/FlyAway2013/p/jmx.html
http://www.cnblogs.com/dongguacai/p/5900507.html