需求場景 項目中有這麼個需求:統計集群中各個節點的數據量存儲大小,不是記錄數。 一開始有點無頭緒,後面查看cassandra官方文檔看到Monitoring章節,裡面說到:Cassandra中的指標使用Dropwizard Metrics庫進行管理。 這些指標可以通過JMX查詢,也可以使用多個內置和 ...
一開始有點無頭緒,後面查看cassandra官方文檔看到Monitoring章節,裡面說到:Cassandra中的指標使用Dropwizard Metrics庫進行管理。 這些指標可以通過JMX查詢,也可以使用多個內置和第三方報告插件推送到外部監控系統(Jconsole)。那麼數據量存儲大小是不是也是cassandra的某項指標了? 帶著疑問,我通過Jconsole看到了cassandra的一些指標(先啟動cassandra, 運行 -> Jconsole),如下圖
JMX(Java Management Extensions)是一個為應用程式植入管理功能的框架。JMX是一套標準的代理和服務,實際上,用戶可以在任何Java應用程式中使用這些代理和服務實現管理。這是官方文檔上的定義,我看過很多次也無法很好的理解。
給我的感覺,jmx server進行監聽,jmx client進行請求的發送,以此達到通信的目的;cassandra的jmx server已經被cassandra實現,我們只需要實現jmx client,就能從cassandra進程中拿到我們需要的指標數據。
JMX Server
1、 首先定義一個MBean介面
public interface HelloMBean { String getName(); void setName(String name); void print(); }
public class Hello implements HelloMBean { private String name; @Override public String getName() { return this.name; } @Override public void setName(String name) { this.name = name; } @Override public void print() { System.out.println("hello, print"); } }
3、定義一個jmx server,並啟動它:
public class HelloService { private static final int RMI_PORT = 8099; private static final String JMX_SERVER_NAME = "TestJMXServer"; private static final String USER_NAME = "hello"; private static final String PASS_WORD = "world"; public static void main(String[] args) throws Exception { HelloService service = new HelloService(); service.startJmxServer(); } private void startJmxServer() throws Exception { //MBeanServer mbs = MBeanServerFactory.createMBeanServer(jmxServerName); MBeanServer mbs = this.getMBeanServer(); // 在本地主機上創建並輸出一個註冊實例,來接收特定埠的請求 LocateRegistry.createRegistry(RMI_PORT, null, RMISocketFactory.getDefaultSocketFactory()); JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + RMI_PORT + "/" + JMX_SERVER_NAME); System.out.println("JMXServiceURL: " + url.toString()); Map<String, Object> env = this.putAuthenticator(); //JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs); // 不加認證 JMXConnectorServer jmxConnServer = JMXConnectorServerFactory.newJMXConnectorServer(url, env, mbs); // 加認證 jmxConnServer.start(); } private MBeanServer getMBeanServer() throws Exception { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); ObjectName objName = new ObjectName(JMX_SERVER_NAME + ":name=" + "hello"); mbs.registerMBean(new Hello(), objName); return mbs; } private Map<String, Object> putAuthenticator() { Map<String,Object> env = new HashMap<String,Object>(); JMXAuthenticator auth = createJMXAuthenticator(); env.put(JMXConnectorServer.AUTHENTICATOR, auth); env.put("com.sun.jndi.rmi.factory.socket", RMISocketFactory.getDefaultSocketFactory()); return env; } private JMXAuthenticator createJMXAuthenticator() { return new JMXAuthenticator() { public Subject authenticate(Object credentials) { String[] sCredentials = (String[]) credentials; if (null == sCredentials || sCredentials.length != 2) { throw new SecurityException("Authentication failed!"); } String userName = sCredentials[0]; String password = sCredentials[1]; if (USER_NAME.equals(userName) && PASS_WORD.equals(password)) { Set<JMXPrincipal> principals = new HashSet<JMXPrincipal>(); principals.add(new JMXPrincipal(userName)); return new Subject(true, principals, Collections.EMPTY_SET, Collections.EMPTY_SET); } throw new SecurityException("Authentication failed!"); } }; } }View Code
點下print按鈕,你會發現控制台會列印:hello, print。
cassandra的jmx server已經自己實現了,我們不需要實現它,我們需要實現的是調用它。
JMX client
public interface HelloClientMBean { void print(); // 方法定義與server端暴露的方法一致 }
2、連接jmx server
public class HelloClient implements AutoCloseable { private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/TestJMXServer"; private static final String ssObjName = "TestJMXServer:name=hello"; private static final int defaultPort = 1099; // cassandra預設埠是7199 final String host; final int port; private String username; private String password; private JMXConnector jmxc; private MBeanServerConnection mbeanServerConn; private HelloMBean hmProxy; /** * Creates a connection using the specified JMX host, port, username, and password. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public HelloClient(String host, int port, String username, String password) throws IOException { assert username != null && !username.isEmpty() && password != null && !password.isEmpty() : "neither username nor password can be blank"; this.host = host; this.port = port; this.username = username; this.password = password; connect(); } /** * Creates a connection using the specified JMX host and port. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public HelloClient(String host, int port) throws IOException { this.host = host; this.port = port; connect(); } /** * Creates a connection using the specified JMX host and default port. * * @param host hostname or IP address of the JMX agent * @throws IOException on connection failures */ public HelloClient(String host) throws IOException { this.host = host; this.port = defaultPort; connect(); } /** * Create a connection to the JMX agent and setup the M[X]Bean proxies. * * @throws IOException on connection failures */ private void connect() throws IOException { JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port)); Map<String,Object> env = new HashMap<String,Object>(); if (username != null) { String[] creds = { username, password }; env.put(JMXConnector.CREDENTIALS, creds); } env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory()); jmxc = JMXConnectorFactory.connect(jmxUrl, env); mbeanServerConn = jmxc.getMBeanServerConnection(); try { ObjectName name = new ObjectName(ssObjName); hmProxy = JMX.newMBeanProxy(mbeanServerConn, name, HelloMBean.class); } catch (MalformedObjectNameException e) { throw new RuntimeException( "Invalid ObjectName? Please report this as a bug.", e); } } private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException { if (Boolean.parseBoolean(System.getProperty("ssl.enable"))) return new SslRMIClientSocketFactory(); else return RMISocketFactory.getDefaultSocketFactory(); } public void print() { hmProxy.print(); } @Override public void close() throws Exception { jmxc.close(); } }View Code
public class JmxClient { public static void main(String[] args) throws Exception { HelloClient client = new HelloClient("localhost", 8099, "hello", "world"); client.print(); client.close(); } }
會在控制台列印:hello, print。
public interface StorageServiceMBean { /** Human-readable load value. Keys are IP addresses. */ public Map<String, String> getLoadMap(); // cassandra端暴露的是屬性LoadMap,那麼此方法名由get加LoadMap組成, 那麼getLoad方法就可以獲取LoadMap的值
2、連接jmx server
public class CassNodeProbe implements AutoCloseable { private static final String fmtUrl = "service:jmx:rmi:///jndi/rmi://[%s]:%d/jmxrmi"; private static final String ssObjName = "org.apache.cassandra.db:type=StorageService"; private static final int defaultPort = 7199; final String host; final int port; private String username; private String password; private JMXConnector jmxc; private MBeanServerConnection mbeanServerConn; private StorageServiceMBean ssProxy; /** * Creates a NodeProbe using the specified JMX host, port, username, and password. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public CassNodeProbe(String host, int port, String username, String password) throws IOException { assert username != null && !username.isEmpty() && password != null && !password.isEmpty() : "neither username nor password can be blank"; this.host = host; this.port = port; this.username = username; this.password = password; connect(); } /** * Creates a NodeProbe using the specified JMX host and port. * * @param host hostname or IP address of the JMX agent * @param port TCP port of the remote JMX agent * @throws IOException on connection failures */ public CassNodeProbe(String host, int port) throws IOException { this.host = host; this.port = port; connect(); } /** * Creates a NodeProbe using the specified JMX host and default port. * * @param host hostname or IP address of the JMX agent * @throws IOException on connection failures */ public CassNodeProbe(String host) throws IOException { this.host = host; this.port = defaultPort; connect(); } /** * Create a connection to the JMX agent and setup the M[X]Bean proxies. * * @throws IOException on connection failures */ private void connect() throws IOException { JMXServiceURL jmxUrl = new JMXServiceURL(String.format(fmtUrl, host, port)); Map<String,Object> env = new HashMap<String,Object>(); if (username != null) { String[] creds = { username, password }; env.put(JMXConnector.CREDENTIALS, creds); } env.put("com.sun.jndi.rmi.factory.socket", getRMIClientSocketFactory()); jmxc = JMXConnectorFactory.connect(jmxUrl, env); mbeanServerConn = jmxc.getMBeanServerConnection(); try { ObjectName name = new ObjectName(ssObjName); ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); } catch (MalformedObjectNameException e) { throw new RuntimeException( "Invalid ObjectName? Please report this as a bug.", e); } } private RMIClientSocketFactory getRMIClientSocketFactory() throws IOException { if (Boolean.parseBoolean(System.getProperty("ssl.enable"))) return new SslRMIClientSocketFactory(); else return RMISocketFactory.getDefaultSocketFactory(); } public Map<String, String> getCassClusterStorage() { return ssProxy.getLoadMap(); } @Override public void close() throws Exception { jmxc.close(); } }View Code
public class JMXTest { public static void main(String[] args) throws Exception { CassNodeProbe prode = new CassNodeProbe(""); Map<String, String> nodeStorages = prode.getCassClusterStorage(); System.out.println(nodeStorages); prode.close(); } }
最後得到結果:{ KB}
cassandra的jmx 認證訪問我就不做演示了,大家自己去實現。
cassandra jmx client實現:cassandra-all
cassandra給我們提供了工具jar,jmx server暴露的在這個工具jar中都有對應的請求方式;
<dependency> <groupId>org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> <version>2.1.14</version> </dependency>