Flow-based Programming

Panta rhei (Panta rhei) - Everything flows.

"Concat" Component

I am still trying to articulate the practical and theoretical differences between "classical" FBP, on the one hand, and NoFlo and the various FBP-inspired, synchronous, systems, on the other. An attempt can be found at FBP vs. FBP-inspired Systems.

Over the last few years a number of products have appeared, described by their authors as being implementations of FBP, but not always having all the characteristics of "classical" FBP - the form of FBP that has been gathering steam over the last 40 years. In particular the implementations of FBP that are based on JavaScript do not usually conform to "classical" FBP as JavaScript has problems supporting certain key concepts of "classical" FBP, such as selective receive and "back pressure" (JSFBP is an exception, but it does not appear to have much of a Future (pun intended)). What is more serious, from my point of view, is that, if one of these products is a person's first exposure to FBP, this may get in the way of their making the all-important FBP "paradigm shift". And on the surface the differences can be quite subtle - it's what is going on underneath that matters. The FBP-inspired products do share some characteristics with "classical" FBP (component orientation, configurable modularity), but most of them miss the essence, which is that FBP is a paradigm change from the standard, traditional von Neumann paradigm...

At an even more fundamental level, JavaScript, and therefore Node.js and NoFlo, cannot take advantage of multiple cores, unless they use workers, which I believe can only communicate by way of character strings (e.g. JSON). Note: this applies to my prototype JavaScript FBP implementation (JSFBP) as well.

The product that gained a lot of attention to FBP was NoFlo, built using JavaScript and Node.js, but paradoxically NoFlo is one of the products that does not implement "classical" FBP, and therefore does not support the FBP Paradigm change, with all the productivity and maintainability gains that this provides.

"Classical" FBP (in what follows I will often just say "FBP") actually has more in common with the old Unit Record technology, which was the way most data processing was done until about the late '60s, when the von Neumann machine started to take over. The von Neumann paradigm nowadays has programmers so brain-washed that they cannot conceive of a different way of doing data processing - yet FBP has many advantages over the von Neumann paradigm (it is still used within FBP components, so you don't have to give up on it all together!). When computers started to take over the field, we lost a number of subtle advantages of Unit Record, which weren't really appreciated at that time.

Bottom line: FBP applications are not von Neumann systems (although individual components are); they are more like factory assembly lines - i.e. a totally different paradigm!

I haven't really found a way to coax people to make the necessary paradigm shift - while at the same time sparing them the painful (?) task of reading at least the first 1/2 of my book (Flow-Based Programming), so I thought I would try talking about a single component which seems to me to highlight a number of the differences.

I have chosen the FBP standard Concatenate component, as it is a simple general purpose FBP component with no parameters. Its function is to force two or more streams of data into a fixed sequence (one after another), usually for human consumption. An example of its use might be to produce a summary page from data coming from different (asynchronously running) areas of an FBP network - e.g. number of transactions might be generated by one part of the network, number of affected customers by another, and total run time by another... Let us then stipulate that they always have to be displayed in the same sequence, independent of when they were generated. I said "for human consumption" because humans usually like things to be displayed in a fixed sequence, even though machines don't care!

Schematically:

Concatenate is an example of a parameter-less FBP component. Its logic is essentially (in pseudocode):

     set variable "c" to array size for array input port
     initialize "i" to 0
     do while "i" < "c"
         receive all IPs from array input port element "i" until end of stream,
              emitting each one to the (single) output port
         increment "i"
     enddo

With small amounts of data, the lack of back pressure in most FBP-inspired products probably doesn't hurt, but in an actual real-world situation you could easily accumulate a few billion IPs waiting at element 2, which cannot be processed until end of stream is detected at element 0 and at element 1. In "classical" FBP, you don't "receive" any of them until you are ready to process them. This works fine because, as well as selective receive, FBP has "back pressure", meaning that a sending process is suspended if the connection involved is full, so the processes generating the "element 2" data will just wait, without any special coding required on the part of the programmer.

Here is Concatenate in JSFBP:


  'use strict';

  module.exports = function concat() {
    var array = this.openInputPortArray('IN');
    var outport = this.openOutputPort('OUT');
    var ip = null;

    for (var i = 0; i < array.length; i++) {
      while (true) {
        ip = array[i].receive();
        if (ip === null) {
          break;
        }
        outport.send(ip);
      }
    }
  };

I could show this component in the other "classical" FBP languages (Java, C#, C++ (with Boost) or Lua), but they all have essentially the same structure, the big difference being that, under the covers, all of these implementations do support multiple cores (this doesn't affect the code, and is transparent to coders).

Now, I found the following under Concat on the Ken Kan's GitHub site, written in CoffeeScript (bear in mind that CoffeeScript uses indentation to indicate logical nesting).

noflo = require 'noflo'

class Concat extends noflo.Component
  description: 'Gathers data from all incoming connections and sends
  them together in order of connection'
  constructor: ->
    @buffers = {}
    @hasConnected = {}

    @inPorts =
      in: new noflo.ArrayPort
    @outPorts =
      out: new noflo.Port

    subscribed = false
    @inPorts.in.on 'connect', (socket) =>
      @hasConnected[@inPorts.in.sockets.indexOf(socket)] = true

      # In this component we need to know which of the sockets
      # sent the data, so we connect to the sockets directly
      unless subscribed
        @subscribeSocket id for socket, id in @inPorts.in.sockets
        subscribed = true

    @inPorts.in.on 'begingroup', (group) =>
      @outPorts.out.beginGroup group
    @inPorts.in.on 'endgroup', =>
      @outPorts.out.endGroup()
    @inPorts.in.on 'disconnect', =>
      # Check that all ports have disconnected before emitting
      for socket in @inPorts.in.sockets
        return if socket.isConnected()
      do @clearBuffers
      @outPorts.out.disconnect()

  clearBuffers: ->
    for id, data of @buffers
      return unless @hasConnected[id]
    @buffers = {}
    @hasConnected = {}

  subscribeSocket: (id) ->
    @buffers[id] = []
    @inPorts.in.sockets[id].on 'data', (data) =>
      unless typeof @buffers[id] is 'object'
        @buffers[id] = []
      @buffers[id].push data
      do @checkSend

  checkSend: ->
    # First check that we have data in all buffers
    for socket, id in @inPorts.in.sockets
      # If any of the buffers is empty we cancel
      return unless @buffers[id]
      return unless @buffers[id].length

    # Okay, all buffers have data: send.
    @outPorts.out.send buffer.shift() for id, buffer of @buffers

exports.getComponent = -> new Concat

This does seem a lot more complicated, but it is probably fairly clear to someone "trained in the art". Unfortunately I am too close to "classical" FBP to easily understand it...! I found a description of this component by Ken Kan, but he doesn't really say what it is for. However, recently Ken told me that it is similar to the "classical" FBP Concat, except that all streams must be in storage before anything is output. This means that, unlike "classical" FBP, it cannot handle streams that won't fit into available storage. Apart from that limitation, they appear to be functionally much the same. However, when we compare the two, the CoffeeScript code shown above seems to support the conclusion that the attempt to support classical FBP in JavaScript (or CoffeeScript) seems to lead to significantly increased code complexity.

Tom Robinson recently posted an implementation of Concat using async/await, which is included in ES2017 and supported natively in node.js and most browsers, as part of his Streampunk project. This does seem much closer to the spirit of classical FBP. As far as I know, however, Javascript still can't support multiple cores, which would seem to be a significant limitation if one wants to use JavaScript on the server side...

FBP vs. FBP-inspired Systems

Home