Stream Processing with DDS and CEP
Posted in TechBlog on May 30, 2011 by Angelo Corsaro
Something I've been writing about for some time is the relationship existing between Stream Processing Systems, DDS and CEP. Below, I'll provide a short introduction of they key concepts behind Stream Processing and then show how you can use OpenSplice and Esper in combination as the your foundation for Stream Processing Architectures.
Stream Processing has several different declinations, such as Data Flow Systems, Reactive Systems, Signal Processing Systems, and Functional Stream Programming. Stream Processing Architectures are quite natural for modeling those Systems which need to react to streams of data produced by the external world, such as the data produced by sensors, or by a camera, or even the data produced by the stock market.
Usually these systems have to operate in real-time over these streams and generate in turns other streams of data that provide information with respect to what is happening or might suggest actions to perform, such as buy stock X, raise alarm Y, or detected spatial violation, etc.
As briefly mentioned above, Stream Processing Systems can have several different declinations, yet the entire class can be described by a common high-level model. Stream Processing Systems can be seen as a collection of modules that communicating via typed data channels. These modules usually play one of the following three roles:
- Sources: Inject data into the system
- Filters/Agents: Performing some computation
- Sinks: Consume data from the System
Streams are usually considered as infinite sequences of data and formally modeled as:
s: T -> S
where:
- T= |N represents discrete times (|N is the set of natural integers)
- S is the domain of the the type associated with the stream.
Filters usually transform one or more incoming streams (let's say n) into one or more outcoming streams (let's say m). Thus in general a filter can be seen as performing the following transformation:
f: [T->S]n -> [T->R]m
Filters/Agents where historically written using regular programming languages but in the past few years Complex Even Processing (CEP) Engines have emerged as a way of describing in a declarative fashion the behaviour of a filter. Considering that (1) streams are typed, and (2) CEP engines, such as Esper operates on typed streams, the most natural way of abstracting streams is through DDS Topics. Beyond the natural fit, DDS also brings a series of benefits that help in building Stream Processing Systems that are (1) easier to configure by leveraging DDS' dynamic discovery, (2) fault-tolerant thanks to DDS built-in fault-management capabilities, and (3) high performance due to the high-throughput/low-latency nature of DDS.
At this point let's see what it takes to feed Esper with data coming from OpenSplice. The first thing that you need to do is to register the topic type with Esper, this can be easily done with the follwing few code lines:
val config = new Configuration
val ddsConf = new ConfigurationEventTypeLegacy
// Configure Esper to use public attributes/methods
ddsConf.setAccessorStyle(ConfigurationEventTypeLegacy.AccessorStyle.PUBLIC)
// The type below should reflect the topic type you want to register.
config.addEventType("ShapeType", classOf[org.opensplice.demo.ShapeType].getName, ddsConf)
val cep: EPServiceProvider = EPServiceProviderManager.getDefaultProvider(config)
Next what you need to do is to make sure that you push the data received via DDS to Esper. This can be easily done as follows:
val runtime =
cep.getEPRuntime reader.reactions += {
case e: DataAvailable[ShapeType] => {
(e.reader read) foreach(runtime sendEvent _)
}
}
Finally, you need to register a listener with Esper to get notification that a specific pattern has been detected. This can be achieved by registering a listener as described below:
val listener = new UpdateListener {
def update(ne: Array[EventBean], oe: Array[EventBean]) {
// Your business logic goes here
}
}
A set of running examples are available as part of the Escliaer project.