FlowBasedProgramming | RecentChanges | Preferences

Mike Beckerle

"Stretching" FBP

The topics I'd like to contribute or see contributed are all about "stretching FBP"

1 - failure behavior and transactions 2 - parallelism - its implications on the data model, and a whole bunch of things about the implementation 3 - internet-scale FBP (just google for Apache "Pig")

If I had to suggest a concrete plan, I (or Sebastian) can write or get material about the challenges of (1). To me this is the most interesting topic.

(3) is available online. The problems of (3) are related to lack of a robust data model, which comes up in (2). More below.

1) A key topic is interfacing FBP systems with "real" endpoint systems that have modern transaction-processing semantics - transactions must be relatively small and short outstanding duration. Ex: in a TP system typically a transaction involves 1 or a few records. FBP offers an opportunity to coarsen this granularity by grouping up a bunch of records into a batch, so that you can cut the TP overhead to 1% as much if you want. However, there's a bunch of caveats about this.

Basically, it is hard to implement a simple FBP programming model if you take failures into consideration. Batch semantics where you can just discard the output and run it again is easy. But the desired semantics is handled in transactions - i.e., that data either is taken in and processed, and its consequent data is transacted into the output systems attached to the flow, or it "never left the input source", i.e., on failure is still found there for the next "retry" of the flow. This is called (by me) "once and only once" or "exactly once" processing semantics. Most systems either implement "at least once" processing, or just make some sort of best effort. (By this I mean commercially successful systems that I built!) In these systems the "logic" of the application is generally dwarfed by the complexity of dealing with failure (or worse, people think they are more robust than they are.) ETL tools for loading data warehouse databases from message queues are the basic execution idiom here. The targets of the flow are database tables. Staging the output in a separate table, and only transacting that into the "real" destination table on batch-processing flow success is the usual technique of choice, but has the obvious space problems, and defers problems like duplicate keys outside the flow.

I have a former student, Sebastian Ertel - on CC, now engineer at IBM, but also pursuing further graduate studies, who has worked on this problem extensively and has a good system called Ohua (on sourceforge - open and available - we were able to do this open source because it was his thesis work!) which I believe solves the checkpoint/restart and transactional endpoint problem. This is also a first from what I have seen. Everybody else in "stream" processing (example: Stonebreaker's companies like Streambase, various internal IBM "sensor" streaming flow systems, etc. ) has caveats about how the endpoints have to behave (like we get to control that .... except... we don't), or requires fully deterministic flows (which massively increase buffering in very unreasonable ways - particularly troubling when parallelism is involved). Ohua requires none of this, and implements "once and only once" semantics, fully transparent checkpoint/restart with a reasonable local burden on operator authors. This is work in progress - it is implemented but not sufficiently tested, but if I were building a FBP system today I'd be implementing this.

2) Parallel pipelined FBP - as in Torrent/Ascential?/IBM data integration software, Ab Initio (rumored - there is no true public info about this), and now a bunch of additional systems - Expressor, Data Integrator, DataRush? (Pervasive), Data<your verb here>, some open source stuff including above mentioned Ohua, which is quite new and not yet mature. There's a bunch of these now. I would not write a survey here, but discuss learnings/principles. Much of this parallel/partitioned stuff is outgrowth of Goetz Graefe's Volcano paper, or was co-invented around the same time, but there are some interesting variants, and a lot of things are folklore of the trade now. Since people are taking them company to company rather than treating them as proprietary intellectual property, I feel ok to discuss many of these things now. E.g., some things are obvious from hindsight now - the scheduling policy should be downstream operators favored (data locality and buffer minimization) scheduling that favors recently-scheduled but still enabled operators wins (data locality), use of overpartitioning to keep systems busy when there are I/O operators, coping with operators which do remote work, solving buffer deadlocks even when the graph "seems" to be acyclic that are caused by parallelism, ways to reduce the overhead of the operators by combining them (and APIs for handling records which allow this.)

A very interesting topic is the data model. Most systems have implemented small finite row-like (flat) records or name-value pair lists as the info packets. This is grossly inadequate, as you pointed out in the original book where you suggested what I have called the "bracket model" for extending FBP to cover hierarchical data. The problem is that the bracket model doesn't mix well with parallelism. E.g., do you broadcast the brackets so that you can divide and conquer the subrecords? - what about sub-sub records - over what domain of threads do you broadcast those, etc. This is alot of broadcasting - storms of it, if done naively - I know of no system that addresses this which handles large hierarchical data - big enough that you want to carve it up for parallel processing - other than the most recent - and still not productized - system at IBM that was a work in progress when I left there 2 years ago.

3) Related things like Relational Databases - which use parallel flow engines almost universally, and Map/Reduce? - highly related to the above parallel-pipelined FBP, and its data model issues, but designed with hugely scaled up - where failures are always occurring - aspects. There's languages now for cascading flows of map/reduce operations together - these are FBP. (Apache Pig is the language du jour).

Type systems for FBP data

(Parametric Polymorphism)

Concept 1: each port has an interface schema - describes the structure of data arriving there. These are partial schemas - they describe the requirements the data must satisfy, not the exact structure. An input schema describes the requirements the incoming data must satisfy, an output interface schema describes the constraints the output structure obeys when data arrives there. Hooking such an operator into a flow requires you to say which actual data fields map to each aspect of the interface.

Based on these interface schemas, the author of the operator code has APIs available for manipulating the data which work in terms of the interface, so for data where the interface says a field is mapped, on an input, the operator can read it. For a field mapped on an output, the operator author has an API to write it, and for a field which is transferred from input to output, a special API allows ONLY this behavior to be expressed. You can move the data from input to output, but you can't look at it, nor modify it.

One of the important things this can represent is what we call "transfer" or "polymorphic transfer" - so an operator with say 1 input and 1 output port can say that the input looks like X, Y, Z* -i.e., needs two fields mapped to names X, and Y, and a third Z can have any number of fields mapped to it. The output looks like W, Z*, so whatever was mapped to X, and Y is used to compute new field W, but Z* transports exactly to the output what was mapped to it from the input.

The API the author of the operator uses to write the operator would in this case enforce the "contract" by not providing any way to look at or touch or modify what wasmapped to Z*, so that it is dependable that it is transferred through without modification of any sort.

If you cascade operators like this toghter into a flow, and an ordinary compiller-style dataflow analysis can tell yoy where each field of each record comes into existence, and where it is read, and where it disappears, along with where its value is preserved so that flow restructuring is allowed.

Many benefits accrue to FBP sytsems that have this kind of type system. Probalby the most important is to be able to move around "repartitioning" as is needed to spread out data sets to operate on them in parallel. This is an expensive operation, and to be avoided if at all possible. If the partitioning key fields are preserved by any operator, then the partitioning operation can be hoisted to be done before that operation. This will often allow the partitioning to be hoisted to the beginning of a flow, thereby eliminating a separate basic round-robin or pseudo-random paratitioning of data then followed by some key-based repartitioning later in the flow.

This partitioner-hoisting optimization is an important one because if you create black-box sub-graphs which include a partitioning operation, the flow optimizer really really wants to decompose them and hoist the partitioners up to earlier parts of the flow without affecting the result. Yet black box abstractions really do want to be able to be created which hide the fact that a repartitioning operation is part of their logic. If you want this black box algoritm hiding, you have to enable the hoisting optimization or the costs get out of hand.

To me the FBP potential is realized only when you have a quite strong polymorphic type system which allows easy creation of highly polymorphic operators, and provides the FBP system the information it needs to perform rich opimizations.

FlowBasedProgramming | RecentChanges | Preferences
This page is read-only - contact owner for a password | View other revisions
Last edited January 3, 2010 12:01 pm by PaulMorrison (diff)