When creating an operator, it may be necessary to submit tuples at specific intervals. This can be useful in a few ways: source operator that every interval generates a random number of tuples(good for testing), output a status tuple every few seconds etc.. The uses are many. This post shows how to do this in a Java operator.

When creating a Java operator, this same behaviour can be accomplished by the using a ScheduledExecutorService. A ScheduledExecutorService allows commands to run after a given delay and (optionally) at a fixed interval.

In order to create a ScheduledExecutorService within an operator, you need to use the OperatorContext.getScheduledExecutorService() method. This can be achieved using the following:

ScheduledExecutorService executorService = getOperatorContext().getScheduledExecutorService();

It is important that the ScheduledExecutorService be created in this fashion. Attempting to create a ScheduledExecutorService in a separate thread outside of the operator context can result in strange behaviour, such as the operator shutting down before the executor service has even started. Furthermore, the thread where the ScheduledExecutorService is running may continue to run within the JVM, however no tuples will be submitted since the operator has shutdown.

Once the ScheduledExecutorService has been created within the context of the operator, you can schedule a Runnable to execute at a fixed interval. This can be accomplished by using the following method:

ScheduledExecutorService.scheduleWithFixedDelay(...)

Below is an example of scheduling a Runnable to continuously submit a random integer every 3 seconds. The ScheduledExecutorService will begin after a 5 second delay.

final Random random = new Random();
Runnable runnable = new Runnable() {
			
			@Override
			public void run() {
				try {
					outTuple.setInt(0, random.nextInt());
					out.submit(outTuple);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		};

        ScheduledExecutorService execService = getOperatorContext().getScheduledExecutorService();
        execService.scheduleWithFixedDelay(runnable, 5, 3, TimeUnit.SECONDS);
        ...

1 Comment on "Creating a Java source operator that runs on a delay"

  1. The Java operator samples provided in the product include similar reusable functionality:

    TupleProducer – pattern for a source operator, includes a initialDelay parameter to delay production of tuples
    PollingTupleProducer – pattern for an operator that produces tuples periodically (like this article) (and inherits from TupleProducer and thus the initialDelay parameter).
    RandomBeacon – Inherits from PollingTupleProducer and produces a random tuple every N seconds.

    Basically the patterns are intended to provide useful common functionality and allow operator developers to focus on their specific tuple logic.

    http://www-01.ibm.com/support/knowledgecenter/api/content/SSCRJU_3.2.1/com.ibm.swg.im.infosphere.streams.spl-java-operators.doc/samples/overview-summary.html

Join The Discussion