In Streams 4.0, a new management interface was introduced that uses the Java Management Bean tools. The Streams InfoCenter contains reasonably good documentation on how to use this, but it’s scattered. This attempts to pull all the needed information into one place.

Jars Needed In The ClassPath:

  • $STREAMS_INSTALL/lib/com.ibm.streams.management.jmxmp.jar
  • $STREAMS_INSTALL/lib/com.ibm.streams.management.mx.jar
  • $STREAMS_INSTALL/ext/lib/jmxremote_optional.jar (run time only, after the others, needed only if connecting to a JMX server on another system)

Information Needed:

  • Domain name
  • JMX Connection String for the domain (available from streamtool getjmxconnect) : e.g. "service:jmx:jmxmp://ibmstreamsnode.development.ibm.com:9975"
  • User id and password

Objects of Interest:

(Partial list)

  • ibm.streams.management.domain.DomainMXBean -JMX Bean to interface to domains
  • ibm.streams.management.instance.InstanceMXBean -Interface for instances
  • com.streams.management.job.JobMXBean -Job interface
  • ibm.streams.management.job.OperatorMXBean -Operator interface (inside the job namespace, oddly enough)
  • ibm.streams.management.ObjectNameBuilder -Utility tool to build names needed to create the beans
  • ibm.streams.management.Metric -Generic metrics object

Basic Steps:

    1. Connect to the JMX environment and create a MBeanServerConnection: (assumes domainName, jmxUrl, userid and password are set)

  HashMap<String, Object> env = new HashMap<String, Object>();
  String[] creds = {user, password};
  env.put("jmx.remote.credentials", creds);
  env.put("jmx.remote.protocol.provider.pkgs","com.ibm.streams.management");
  JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl),env);
  MBeanServerConnection msbc = jmxc.getMBeanServerConnection();
  1. Create the beans needed – for Jobs you must have an instance Bean, but the Domain bean is optional. For Operator Beans you must have a registered Job Bean although it’s not directly used in the creation.
      • Create a Domain Bean:
    
       ObjectName domName = ObjectNameBuilder.domain(domainName);
       DomainMXBean domain = JMX.newMXBeanProxy(msbc, domName, DomainMXBean.class, true);
    
      • Create an Instance Bean:
    
      ObjectName instName = ObjectNameBuilder.domain(domainName , instanceName);
      InstanceMXBean instance = JMX.newMXBeanProxy(msbc,instName,InstanceMXBean.class,true);
    
      • Create a Job Bean :  (You’ll have to have the job number here, it’s not accessable by name)
    
      ObjectName jobName = instance.registerJob(jobno);
      JobMXBean job = JMX.newMXBeanProxy(msbc,jobName, JobMXBean.class, true);
    
      • Create an Operator Bean :
    
       ObjectName opName = ObjectNameBuilder.operator(domainName, instanceName,jobno , operatorName);
       OperatorMXBean operator = JMX.newMXBeanProxy(msbc,opName,OperatorMXBean.class,true) ;
    
      • To get a Set<String> of instance names from the domain: getInstances();
      • To get a Set<Biglnteger> of jobs from the instance: getJobs();
      • To get a Set<String> of operator names from the job: getOperators();
  2. Now you have your beans, you can get access to any information you could through the Console or StreamsStudio tool. See the documentation in the Streams Knowledge Center for the complete set of methods supported by these beans.

 

Sample Program To Get Metrics for Every Operator:


  import java.util.HashMap;
  import java.util.Set;
  import java.util.Iterator;
  import java.util.Date;
  import java.math.BigInteger;
  import javax.management.JMX;
  import javax.management.MBeanServerConnection;
  import javax.management.ObjectName;
  import javax.management.remote.JMXConnector;
  import javax.management.remote.JMXConnectorFactory;
  import javax.management.remote.JMXServiceURL;
  import java.io.Console;
  import com.ibm.streams.management.domain.DomainMXBean ;
  import com.ibm.streams.management.instance.InstanceMXBean;
  import com.ibm.streams.management.ObjectNameBuilder;
  import com.ibm.streams.management.job.JobMXBean;
  import com.ibm.streams.management.job.OperatorMXBean;
  import com.ibm.streams.management.Metric;

  // To compile:
  // javac -cp com.ibm.streams.managment.jmxmp.jar: \
  //           com.ibm.streams.management.mx.jar.Client.java

  // To run:
  // java -cp .:com.ibm.streams.management.jmxmp.jar:\
  // com.ibm.streams.management.mx.jar:jmxremote_optional.jar

  // Uses console prompt to get userid and password
  public class Client {
  public static void main(String [] args) {
  try {

  String jmxUrl= "service:jmx:jmxmp:myserver.domain:9975";
  String domainName = "streaml";
  Console con;
  con = System.console();
  if (con == null) return;

  // Get the password from the console and set the environment for authentication
  String user = con.readLine("UserID :" );
  String password = new String(con.readPassword("Password:"));
  HashMap<String, Object> env = new HashMap<String, Object>();
  String[] creds = {user, password};
  env.put("jmx.remote.credentials", creds);
  env.put("jmx.remote.protocol.provider.pkgs", "com.ibm.streams.management");

  JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl), env);
  MBeanServerConnection msbc = jmxc.getMBeanServerConnection();

  ObjectName objName = ObjectNameBuilder.domain(domainName);
  DomainMXBean domain = JMX.newMXBeanProxy(msbc, objName, DomainMXBean.class, true);

  System.out.println("Status: "+ domain.getStatus());
  System.out.println("Instances: " + domain.getInstances());

  // If you have a known instance you can just create the InstanceMXBean
  Set<String> instances = domain.getInstances();
  for (String inname : instances)
  {
   ObjectName instName ObjectNameBuilder.instance(domainName, inname);
   InstanceMXBean instance = JMX.newMXBeanProxy(msbc, instName, InstanceMXBean.class, true);
   System.out.println("Instance: " + inname);
   System.out.println("Jobs Running: "+ instance.getJobs());
   System.out.println("Status: "+ instance.getStatus());

   // Note – job numbers are BigIntegers, not int
   Set<BigInteger> jobs = instance.getJobs();

   for (BigInteger jobno: jobs)
   {

       ObjectName jobName = instance.registerJob(jobno);
       JobMXBean job = JMX.newMXBeanProxy(msbc, jobName, JobMXBean.class, true);
       System.out.println("Job number: "+ jobno);
       System.out.println("Job name: + job.getName());
       System.out.println("Application name: "+ job.getApplicationName());
       System.out.println("Health: " + job.getHealth());
       System.out.println("Id: "+ job.getId());
       System.out.println("Operators: "+ job.getOperators());
       Set<String> operators = job.getOperators();
       // Note – operators names are composite.operator and parallel
       // operators are composite[channel].operator

       for (String s : operators)
       {
            ObjectName opName = ObjectNameBuilder.operator(domainName,inname,job.getId(), s);
            OperatorMXBean operator = JMX.newMXBeanProxy(msbc, opName , OperatorMXBean.class, true);

            // Set the include Aggregates to true to get the aggregate port metrics

            Set<Metric> metrics = operator.retrieveMetrics(true) ;
            System.out.println( "Operator Physical Name: " + operator.getName());
            System.out.println( "Operator Logical Name: " + operator.getLogicalName());
            System.out.println("Index In Job: " + operator . getIndexWithinJob());
 
            for( Metric m: metrics)
            {
               System.out.println( "" );
               System.out.println( "Metric Name : "+ m.getName()) ;
               System.out.println( "Description : " + m.getDescription());
               Date date = new Date(m.getLastTimeRetrieved());
               System.out.println("Last Retrieved: " + date);
               System.out.println("Value: "+ m.getValue());
            } //Metrics loop
            System.out.println( "" ) ;
      } //Operators loop
   } // Jobs loop
  } // Instances loop

  } catch(Exception e)
  {
      e.printStackTrace();
  }
  }
  }

Additional Methods that may be of interest for monitoring

DomainMXBean

      • retrieveProductLogO -returns an https:// URI to retrieve the product log
      • retrieveProductLogAndTraceFiles() -returns an https URI that can retrieve the log and service trace files

 

InstanceMXBean

      • cancelJob(Biglnteger joblD, boolean force) -cancel the numbered job
      • deployApplication(String bundlefilename) -submit a job for deployment
      • getJobsO -return a set of active job numbers
      • snapshotJobMetricsO -takes a snapshot of the PE Metrics for all jobs to which the current user has access and returns an https URI to get the results
      • startO
      • stop()
      • submitJob() -submits an application to run as a job

 

JobMXBean

      • snapshotO -takes a current snapshot of the attributes and returns an https URI to retrieve the results
      • snapshotMetricsO -takes a current snapshot of the metrics for all PEs in the job and returns an https URI to retrieve the results

 

OperatorMXBean

      • getlnputPortsO -returns indexes of input ports within the operator
      • getOutputPortsO -returns indexes of output ports within the operator

 

OperatorlnputPortMXBean

      • This bean provides an interface to an operators input port.
      • retrieveMetricsO -returns the metrics for the port
      • Name is generated via ObjectNameBuilder.operatorInputPort(domain, instance, jobId, operator, portIndex)

 

OperatorOutputPortMXBean

    • This bean provides an interface to an operators output port.
    • retrieveMetrics() -returns the metrics for the port
    • Name is generated via ObjectNameBuilder.operatorOutputPort(domain, instance, jobId, operator, portIndex)

Join The Discussion