[erlang-questions] [ANN] Rivus - Erlang Complex Event Processing

Darach Ennis darach@REDACTED
Mon Jan 6 13:41:33 CET 2014

Hi Vasco,

Combinators are, ugh, complicated. You can roughly break then down

* Merge - take two (or more) partially ordered streams and create a single
totally ordered resultant stream
* Gather - works in concert with splitting (or scattering) a stream of
events down multiple sub streams.
    Most implementations of this aren't tolerant of losing events that
should be gathered
* Pattern - Uses some form of temporal logic to define match criteria
* Join - Typically windowed, find matching events in two streams by value
or predicate expression
* Union - the most general streaming combinator is stateless and simply
passes any input events to outputs.
* There are others.

Union and Gather are typically more commonly found in the wild in my
experience than Merge or Join
but all are useful some of the time. Pattern is useful if you're checking
causal order - "did event B occur
after and only after event A whilst event C was non occurring" kind of
thing but composition "... within the
last 5 minutes / events" is king.

But stateful combinators are quite common. In fact, robust data recovery is
the hard part of the problem.
The database community has some discipline here (CRDTs) that a distributed
CEP may want to consider
for first order support that doesn't get handled elegantly by most
traditional CEP engines. Bloom's exploitation
of out of order processing is another interesting thing that would be nice
to see influencing CEP engine design.

Anyway, StreamBase's SQL dialect is well documented and more complete than


The temporal pattern language may also be of interest:


IBM's SPL is also very well documented and worth exploring:


It would certainly be worth your time surveying the many different flavors
of CEP and ESP currently out
there. Ultimately, the kind of streaming algorithms and continuous
processing you are embarking upon will
have a more significant influence. What you won't get from a lot of the
lower level mechanics are the higher
level flow patterns that an experienced practitioner will use but you may
not always find directly in syntax...

These typically influence the direction and evolution of CEP languages but
as its a domain that is still very
much evolving (technically) you will find quite a lot of variation. I'd
advise only implementing that which you
need. Its easier to extend later than remove a bad construct. A good
example is multi-dimensional windows.
Pretty much all CEP engines I know of that support this got the model
wrong... worse, the human brain doesn't
really like handling more than one dimension simultaneously so favoring a
dialect that allows ease of composition
of simple operations over complex operations in simple flows.

Another omission in most CEP engines is good distribution, fault tolerance
and clustering out of the box.
By 'good' I mean with respect to some formal notion of recovery (can an
algorithm or query recover
precisely without loss or not). This is hard and existing CEP engines get
this wrong - it turns out that solving
this wasn't commercially compelling until recently - but near real-time
market abuse and surveillance and continuous
big data use cases need better solutions here specifically.

All part of the fun of building a full blown CEP engine ...



On Mon, Jan 6, 2014 at 11:06 AM, Vassil Kolarov <vasco@REDACTED> wrote:

> Hi Darach,
> Thank you for your elaborated post!
> Frankly speaking, this project was started by pure curiosity rather than a
> necessity. That is why I announced it in this early stage (may be a bit
> prematurely) – to gather opinions and ideas.
> Data parallelism on a single query is out of scope (for now). I was more
> concerned about “scaling out” scenario, but surely this is something to
> think about.
> The main goal was to have the ability to execute operations over multiple
> event streams, where each stream contains a single event type. Currently,
> only “implicit joins” are supported in the form of “select…. from event1,
> event2 where event1.paramX = event2.paramX”. That is why I am using ETS
> (actually provided by “folsom”). I gave it some thinking, but failed to
> find how to join multiple streams without a temporary storage. On the other
> hand – if there are no joins in the query, it can be done without using
> ETS, simply by updating aggregations on the fly, as the event arrives. That
> is exactly what I am working on at the moment.
> Btw, my background in the CEP theory is minimal (something I plan to catch
> up on), do the above-mentioned stream “joins” qualify as “stream
> combinators”?
> My immediate plans are to extend the DSL with:
> - “Named windows” (similar to ESPER ones) – basically a new streams, each
> one built from a query result, which can be used by other queries.
> - Defining the events, so that event behaviour (event1.erl and event2.erl
> in the test suite) to be generated automatically
> - Event patterns over streams and named windows
> - Batch (trigger result on specific time intervals) and tumbling (trigger
> result on specific number of inputs) windows
> I’ll definitely think about using eep-erl and beam-erl J
> Thank you again!
> Best regards,
> Vasco
> On Mon, Jan 6, 2014 at 2:31 AM, Darach Ennis <darach@REDACTED> wrote:
>> Hi Vasco,
>> [Resent to all]
>> Looks like you're off to a good start here towards a SQL based CEP engine
>> for Erlang/OTP.
>> There are arguably 4 key constructs that Rivus would need to support to
>> qualify as a CEP engine:
>> 1. Continuous Query
>> 2. Windows (aggregation)
>> 3. 'Complex' pattern matching (eg: temporal patterns across streams,
>> stream combinators, ...)
>> 4. A domain specific language.
>> So, Rivus would basically qualify in principle under 1, 2 and 4 here but
>> not entirely under 3. I saw
>> no support for combinators, temporal pattern matching, state (tables or
>> variables), concurrency or
>> data parallelism. However, as the syntax allows the definition of
>> multiple correlations it could be said
>> that simultaneity (of queries) is supported - and this is essential.
>> A potential issue depending on your target audience is that the
>> facilities in core erlang for
>> expressive pattern matching and tuple processing aren't leveraged, nor is
>> the native support
>> for concurrency and parallelism through providing concurrency and data
>> parallelism of queries
>> in the DSL. StreamBase (note: I used to work for them so I'm probably
>> biased) support concurrency
>> and (data) parallelism in their SQL and visual (flow-based) languages and
>> in the hands of an
>> expert (eg: someone intimate with the runtime) is very powerful.
>> Apama is worth looking it - its runtime was inspired by the Erlang
>> concurrency model (allegedly)
>> and its monitorscript language supports processes.
>> In CEP languages and environments where these and more features are
>> provided - it doesn't
>> actually help the poor CEP developer - it hinders. Large complex CEP
>> algorithms are difficult to
>> evolve, maintain and support in most organisations. The tools typically
>> have weak debugging and
>> refactoring support if at all and the DSLs aren't standard and often have
>> constructs peculiar to the
>> lineage of the technology (it may have started out life as an active
>> database or as a log processing
>> tool or as a captured packet analyser ...).
>> StreamBase, by far, has the best tooling (yup, I'm biased), IDE,
>> debugging and refactoring support.
>> But a significant component of any successful CEP solution is native
>> code. A question I've been
>> grappling with is, what are the useful bits in CEP that would be useful
>> within Erlang as a library or
>> service? I've experimented with what I think are the most useful two,
>> namely: aggregate window
>> processing and data flow algorithm definition. Rather than define a SQL
>> to enshrine the conditions
>> under which they can be leveraged - plain old erlang provides a richer
>> environment for these to be
>> used fruitfully. Why write a DSL and Erlang is a 'real language' - one
>> that makes pattern matching
>> easy with distribution and concurrency built in if you need it?
>> https://github.com/darach/beam-erl
>> Embeddable data flow library. Branch, Pipe, Combine, Filter and Transform
>> data.
>> https://github.com/darach/eep-erl
>> Sliding, Tumbling, Periodic and Monotonic aggregation.
>> In the case of aggregation once the window semantics are defined the
>> window functions
>> can be provided by extension (use an erlang behaviour). So you could use
>> this to allow
>> user defined (aggregate) functions in your SQL dialect. The SQL
>> statements and clauses
>> could be compiled to an intermediary form that supports a more primitive
>> flow language
>> and runtime making building out the CEP engine a little bit easier. This
>> also favors plugging
>> in user defined constructs allowing the language itself to be extended
>> through user defined
>> operations.
>> I haven't implemented temporal pattern matching or interesting complex
>> combinators because I haven't
>> needed them myself of late. Even the window aggregation is simple. Some
>> CEP engines allow aggregates of multiple
>> dimensions (data, time, value, predicate expression) but these can
>> typically be (far) more easily defined
>> through composition.
>> Judging from the SQL dialect syntax the need so far is for expressivity
>> in filtering / detection
>> based on fairly simple windowed events or windowed temporal processing of
>> event data. Complex
>> processing of multiple event sources against complex scenarios (eg: near
>> real-time collusion or layering
>> fraud detection in capital markets market abuse and surveillance) without
>> extending the language dialect
>> would be hard/impossible in its current form.
>> It would be interesting to hear what the plans for the language are and
>> if/where plain old Erlang/OTP
>> could be leveraged to extend the capabilities of the engine. Once concern
>> though is that aggregation
>> windows are based on folsom, which in turn depends on ETS. If any of your
>> queries contain large
>> numbers of aggregate operations - you'll spend a lot of time in ETS. If
>> your intermediate operation
>> results aren't critical - thats a lot of overhead you can avoid by
>> providing your own aggregation/windowing
>> logic (or fork/steal eep-erls)
>> Promising start to a SQL based CEP in Erlang/OTP! Thanks for sharing so
>> early in its development!
>> Cheers,
>> Darach.
>> On Mon, Jan 6, 2014 at 1:02 AM, Vassil Kolarov <vasco@REDACTED> wrote:
>>> Hi all,
>>> I'd like to announce a project called 'Rivus'.  The goal of the project
>>> is implementation of complex events processing application in Erlang, which
>>> uses a DSL similar to ESPER's EPL. It is in a very early stage, but could
>>> be considered sort of a "MVP".
>>> Here is the GitHub repository: https://github.com/vascokk/rivus_cep
>>> Hope you'll find it interesting and useful.
>>> Best regards,
>>> Vasco
>>> _______________________________________________
>>> erlang-questions mailing list
>>> erlang-questions@REDACTED
>>> http://erlang.org/mailman/listinfo/erlang-questions
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://erlang.org/pipermail/erlang-questions/attachments/20140106/70553eca/attachment.htm>

More information about the erlang-questions mailing list