Syntax of JavaFBP (Java Implementation of FBP)
and Component API


Contents

General

In my book, "Flow-Based Programming", I describe the syntax of the network specifications of various FBP dialects that were in existence when the book was written. JavaFBP, the Java implementation of the FBP concepts, did not exist at that time, so this web page has been added describing the syntax of JavaFBP network definitions.

As described in my book, many non-trivial applications can be built using only the network definition language, so this web page starts with a description of the network definition language.  A running program can be built up in layers, using JavaFBP's subnet notation.  There is also a diagramming tool (DrawFBP), which can be used to define the network graphically, and which can actually generate the network definitions. 

If new components have to be coded, they will be coded in Java, using the Java component API.  Components also have attrbutes that have to be visible to the JavaFBP scheduler - these are specified using component metadata, described below.

The source code for the various constituents of JavaFBP is on SourceForge under Subversion (SVN) for the Flow-Based Programmming project: SVN for FBP .   There is also a jar file - JavaFBP jar file - on the FBP web site.

One advantage of defining the network as executable code, as compared with other approaches that merely list connections in a language-independent way, is that the network can contain additional logic. This logic then controls the way the network is defined, rather than the way it runs. Some may regard this as a defect, rather than as an asset, and both views can certainly be defended, but one of the neat things it enables us to do is to adjust multiplexing levels of various structures in the diagram using a table of values (remember the multiplexing example in Sample DrawFlow Diagram). One merely retrieves a value from a table for the degree of multiplexing in a particular structure in the diagram, and this value is then used both as the index of a loop invoking the connect statement, and as the index for the elements of an array-type port (see below for both of these terms).

Version 2.5 of JavaFBP has the following additional features (including enhancements to metadata, also described below in Component Metadata) :

 

Please note that, as of version 2.5, process names and port names may only contain Unicode letters, figures, underscore and blank, as suggested by one of JavaFBP's registered users.  We hope this does not disturb any current applications, but we judged this was necessary to protect the extension possibilities for JavaFBP.  A similar change will be made to C#FBP shortly.

As of version 2.6, JavaFBP now checks that components only receive and send packets containing data of the type specified in the class annotations. This uses the Java isAssignableFrom() method - e.g. if the port annotations specify Object (the default), this port may send or receive packets containing anything.

A sample component has been added to the jar file, called DispIPCounts, using the getIPCounts() method, which outputs the requested counts at intervals specified in an IIP.  This is just to give an idea of what could be done.  Here is some sample output from this component (connections are identified by the name of their downstream - i.e. input.port):

2011.03.21T14:07:25:647 TestIPCounting.Discard.IN                       9763
2011.03.21T14:07:25:656 TestIPCounting.ReplStr.IN                       9787
2011.03.21T14:07:26:160 TestIPCounting.Discard.IN                      37865
2011.03.21T14:07:26:160 TestIPCounting.ReplStr.IN                      37887
2011.03.21T14:07:26:660 TestIPCounting.Discard.IN                      67349
2011.03.21T14:07:26:660 TestIPCounting.ReplStr.IN                      67360


Network Definitions

Since the way the syntax relates to the underlying diagram may not be all that clear, a brief description is in order.  At the end of this page, I have given an extremely simple JavaFBP component.

Any JavaFBP network definition starts as follows:


public class xxxxxx extends Network {

protected void define() {


where xxxxxx is the Network name, including the usual imports, copyright statements, etc. Of course you will have to import classes for JavaFBP objects, such as Network and Component, as well as any JavaFBP "verb" classes you may be using.

The network definition is terminated with:

}
public static void main(String[] argv) throws Exception {
new xxxxxx().go();
}
}

In between the beginning and the ending statements defining the network, you specify a list of connections, using the following methods, which I will refer to as "clauses":

Every component instance must have a unique character string identifying it, which allows other component instances or initial information packets (IIPs) to be attached to it via a connection.

The following method call: 

component("xxxx")

returns a reference to a component instance. The first reference to this particular component instance must specify the component class to be executed by that occurrence. This is done by coding

component("xxxx", cccc.class)

where cccc  is the name of the Java module to be executed.

Similarly, a port is identified by a port clause, e.g. port("xxxx").

A port may be an array-type port, in which case the port clauses referring to its elements have index values, as follows: 

port("xxxx",n)

where "n" runs up monotonically from 0. Each element of the port array will be connected to a different component occurrence or IIP.

A connect or initialize clause may contain the relevant component clauses, together with their corresponding port clauses, embedded within it, as e.g.

     connect(component("Read", ReadText.class),
         port("OUT"),
         component("Splitter1", Splitter1.class),
         port("IN"));

or the connect and component portions may be in separate statements, provided component precedes any connects that reference it.

A connect contains:

These last two may appear in either order if present, and one or both may be omitted. If the optional fifth parameter (connection capacity) is omitted, the default value is used: 1 for testing, or 10 for production (the static int value defaultCapacity in the Network class has to be set either to DEBUGSIZE (1) or PRODUCTIONSIZE (10)). 

If the optional sixth parameter (IP count facility desired) is specified as true, the number of IPs passing across this connection is continuously updated in a synchronized internal HashMap, keyed on the full process name (process name preceded by the name of containing subnet process (separated by a period) and so on recursively) and port name.  An example of a sample component that displays any specified counts at some interval specified in an IIP is included in jar files - versions 2.5 and later - under the name DispIPCounts.  The network demonstrating the use of this function is in test.networks, called TestIPCounting.

If an asterisk (*) is specified for the "from" port, this is called an "automatic output port", and indicates a signal generated when the "from" component instance terminates (actually the port is just closed, so no packet has to be disposed of). 

If an asterisk (*) is specified for the "to" port, this is called an "automatic input port", and indicates a delay - the "to" component instance does not start until a signal or a close is received at this port.

If *SUBEND is specified as a port name on a subnet, a packet containing null is emitted at this port every time the subnet deactivates, i.e. all the contained components terminate.  It doesn't have to be named in the port metadata.  This null packet is emitted for all activations, including the last one.

An initialize clause contains:

as e.g.

     initialize(new FileReader(
"c:\\com\\jpmorrsn\\eb2engine\\test\\data\\myXML3.txt"),
       component("Read"),
           port("SOURCE"));

However, it has been recommended that IIPs should be strings, rather than arbitrary objects, to facilitate future graphical management of networks.

One last point: any number of "from" ports can be connected to a single "to" port; only one "to" port can ever be connected to a given "from" port.

A "named global" facility has been provided - putGlobal(String, Object) - which can be used to make an object available to all components, using the getGlobal(String): Object method.

putGlobal(String, Object) adds an object to a global HashMap using a String as identifier; getGlobal(String): Object can be used by a component to retrieve said object using the same String value.

CAUTION: this is extremely dangerous, as it runs counter to FBP philosophy. It should be used very carefully!

Sample Network

Let us code up a network implementing the following picture:

 

First list the component clauses, together with the component classes they are to execute (assuming that component classes have been written to execute the various nodes of the diagram), e.g.:

 
component("Read Masters",Read.class)
component("Read Details",Read.class)
component("Collate",Collate.class)
component("Process Merged Stream",Proc.class)
component("Write New Masters",Write.class)
component("Summary & Errors",Report.class)

Now these component clauses may either be made into separate statements or they can be imbedded into the connect statements that follow.  Here are the connections in the diagram, without imbedded component clauses:

  connect(component("Read Masters"),port("OUT"),component("Collate"),
port("IN",0));    // array port
connect(component("Read Details"),port("OUT"),component("Collate"),
port("IN",1));      // array port
connect(component("Collate"),port("OUT"),
component("Process Merged Stream"), port("IN"));
connect(component("Process Merged Stream"),port("OUTM"),
component("Write New Masters"),port("IN"));
connect(component("Process Merged Stream"),port("OUTSE"),
component("Summary & Errors"),port("IN"));

So you can either separate your component and connect statements, or add the class designation to the first component clause referencing a particular component occurrence, giving the following:

  
connect(component("Read Masters",Read.class),port("OUT"),
    component("Collate",Collate.class), port("IN",0)); // array port
  connect(component("Read Details",Read.class),port("OUT"),
     component("Collate"),port("IN",1)); // array port
  connect(component("Collate"),port("OUT"),
component("Process Merged Stream",Proc.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTM"),
component("Write New Masters",Write.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTSE"),
component("Summary & Errors",Report.class),port("IN"));

Now "Read Masters" and "Read Details" use the same Java class, so we need some way to indicate the name of the file that each is going to read. This is done using Initial Information Packets (IIPs). In this case they might usefully specify FileReader objects, so we need to add two initialize clauses, as follows:

  
initialize(new FileReader("c:\\mastfile"),
component("Read Masters"),
port("SOURCE"));
initialize(new FileReader("c:\\detlfile"),
component("Read Details"),
port("SOURCE"));

Note that, since both "Read" component occurrences use the same class code, they naturally have the same port names - of course, the ports are attached to different IIPs.

Remember that back-slashes have to be doubled in Java character strings!

"Write New Masters" will have to have an IIP to specify the output destination - perhaps:

  
initialize(new FileWriter("c:\\newmast"),
component("Write New Masters"),
port("DESTINATION"));

Note also that this IIP is not a destination for the Writer - it is an object used by this component occurrence so that the latter can figure out where to send data to.

Add the beginning and ending statements, and you're done!   The actual sequence of connect and initialize statements is irrelevant.

Here is the final result:

  
public class xxxxxx extends Network {

protected void define() {
connect(component("Read Masters",Read.class),port("OUT"),
component("Collate",Collate.class),port("IN",0)); // array port
connect(component("Read Details",Read.class),port("OUT"),
component("Collate"),port("IN",1));// array port
connect(component("Collate"),port("OUT"),
component("Process Merged Stream",Proc.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTM"),
component("Write New Masters",Write.class),port("IN"));
connect(component("Process Merged Stream"),port("OUTSE"),
component("Summary & Errors",Report.class),port("IN"));
initialize(new FileReader("c:\\mastfile"),
component("Read Masters"),
port("SOURCE"));
initialize(new FileReader("c:\\detlfile"),
component("Read Details"),
port("SOURCE"));
initialize(new FileWriter("c:\\newmast"),
component("Write New Masters"),
port("DESTINATION"));

}

public static void main(String[] argv) throws Exception{

new xxxxxx().go();
}
}



Simplified Notation (JavaFBP-2.0+)

In the latest release of JavaFBP, we have introduced a new, simplified notation, in addition to that shown above.  In this notation connect specifies two character strings, and initialize specifies an object and a character string.   In both cases, the second character string specifies a combination of component and port, with the two parts separated by a period. Array port indices, if required, are specified using square brackets, e.g.

"component.port[3]"

The old port notation will still be supported, but is only really needed when the port index is a variable.  When debugging, it will be noted that the square bracket notation is used in trace lines, even when it was not used in the network definition.

Component names must of course not include periods or most special characters, but they may include blanks, numerals, hyphens and underscores, and they must be associated with their implementing class using a (preceding) component statement.

Here is the above network using the new notation:

 
public class xxxxxx extends Network {

protected void define() {
component("Read Masters",Read.class);
component("Read Details",Read.class);
component("Collate",Collate.class);
component("Process Merged Stream",Proc.class);
component("Write New Masters",Write.class);
component("Summary & Errors",Report.class);
connect("Read Masters.OUT", "Collate.IN[0]");
connect("Read Details.OUT", "Collate.IN[1]");
connect("Collate.OUT"), "Process Merged Stream.IN");
connect("Process Merged Stream.OUTM", "Write New Masters.IN");
connect("Process Merged Stream.OUTSE", "Summary & Errors.IN");
initialize(new FileReader("c:\\mastfile"), "Read Masters.SOURCE");
initialize(new FileReader("c:\\detlfile"), "Read Details.SOURCE");
initialize(new FileWriter("c:\\newmast"),
"Write New Masters.DESTINATION");
}


public static void main(String[] argv) throws Exception {
 
new xxxxxx().go();
}
}

Here is a network example showing how variable port numbers can be used with the LoadBalance function to define an (admittedly fairly trivial) self-balancing network. 

This also shows a slightly different way of specifying the define function, which avoids having to remember to change the network name in the main() method, when you do cut and paste.  Its disadvantage is that trace output will show a "generated" network name, rather than the actual one.


public class TestLoadBalancer {

public static void main(final String[] args) {
try {
new Network() {
@Override
protected void define() {
int multiplex_factor = 10;
component("generate", Generate.class);
component("display", WriteToConsole.class);
component("lbal", LoadBalance.class);
connect("generate.OUT", "lbal.IN");
initialize("100 ", component("generate"), port("COUNT"));
for (int i = 0; i < multiplex_factor; i++) {
connect(component("lbal"), port("OUT", i),
component("passthru" + i, Passthru.class), port("IN"));
connect(component("passthru" + i), port("OUT"), "display.IN");
}
}
}.go();
} catch (Exception e) {
System.err.println("Error:");
e.printStackTrace();
}
}
}

Simple Subnet

As described in the book, networks can be built up level by level, using what we call "subnets" - they may be thought of as networks with "sticky" connections.  Here is a very simple subnet.

Note the metadata - a subnet can act as a component, so metadata is required.


@OutPort("OUT")
@InPort("IN")
public class SubnetX extends SubNet {
@Override
protected void define() {
component("SUBIN", SubInSS.class); // substream-sensitive
component("SUBOUT", SubOutSS.class); // do.
component("Pass", Passthru.class);
initialize("IN", component("SUBIN"), port("NAME"));
connect(component("SUBIN"), port("OUT"), component("Pass"), port("IN"));
connect(component("Pass"), port("OUT"), component("SUBOUT"), port("IN"));
initialize("OUT", component("SUBOUT"), port("NAME"));
}
}



Sample Component

This component generates a stream of 'n' IPs, where 'n' is specified in an InitializationConnection (specified by an initialize clause in the foregoing). Each IP just contains an arbitrary string of characters, in order to illustrate the concept.  Of course any copyright information included is up to the developer.

The statements starting with @OutPort are called the component metadata, and are described below in the section called "Component Metadata".

A JavaFBP component basically consists of 5 sections:

Here is the sample component:

package com.jpmorrsn.fbp.components;


import com.jpmorrsn.fbp.engine.*;



/** Component to generate a stream of 'n' packets, where 'n' is
* specified in an InitializationConnection.
*/


@OutPort(value = "OUT", description = "Generated stream",
type = String.class)
@ComponentDescription(
"Generates stream of packets under control of a counter")
@InPort(value = "COUNT",
description = "Count of packets to be generated",
type = String.class)

public class Generate extends Component {

static final String copyright = "Copyright .....";



OutputPort outport;

InputPort count;


@Override
protected void openPorts() {
outport = openOutput("OUT");
count = openInput("COUNT");
}

@Override
protected void execute() {
Packet ctp = count.receive();
if (ctp == null) {
return;
}
count.close();

String cti = (String) ctp.getContent();
cti = cti.trim();
int ct = 0;
try {
ct = Integer.parseInt(cti);
} catch (NumberFormatException e) {
e.printStackTrace();
}
drop(ctp);

for (int i = 0; i < ct; i++) {
int j = ct - i;
Integer j2 = new Integer(j);
String s = j2.toString();
if (j < 10) {
s = "0" + s;
}
if (j < 100) {
s = "0" + s;
}
s = s + "abc";

Packet p = create(s);
outport.send(p);

}

}

}

The scheduling rules for most FBP implementations are described in the chapter of my book called Scheduling Rules.

In earlier JavaFBP versions, the scheduler presented an IIP to a component once per activation. This has been changed as of JavaFBP-2.0 to once per invocation.  In practice this will only affect "non-loopers" (components that get reactivated multiple times).  

There are a few other minor changes to component code as of JavaFBP-2.0:

Metadata works as follows:

Input ports and input array port elements are always treated as optional.

Component Metadata

Component Metadata:

Note: when "value" is the only parameter, the "short" form
(see above) can be used

@ComponentDescription
parameters:
- value (String)

@InPort
parameters:
- value (String)
- valueList({String, String, ... })
- arrayPort (boolean)
- description (String)
- type (class)
- setDimension (int) --- requires * at end of value, or some entries in valueList

@OutPort
parameters:
- value (String)
- valueList({String, String, ... })
- arrayPort (boolean)
- description (String)
- type (class)
- setDimension (int) --- requires * at end of value, or some entries in valueList
- optional (boolean)

@InPorts
parameter: list of @InPort references, e.g. @InPorts( { @InPort("IN"),
@InPort("TEST") })

@OutPorts
parameter: list of @OutPort references, e.g. @OutPorts( { @OutPort("ACC"),
@OutPort("REJ") })


@MustRun // Must run at least once

@SelfStarting // Start process at beginning of run, even if it has input ports

@Priority(Thread.MAX_PRIORITY) // default is NORM_PRIORITY

Component API

Packet class:

/**
* A Packet may either contain an Object, when type is NORMAL,
* or a String, when type is not NORMAL. The latter case
* is used for things like open and close brackets (where the
* String will be the name of a group. e.g. accounts)
**/

Object getAttribute(String key); /* key accesses a specific attribute */
Object getContent(); /* returns null if type <> NORMAL */


Component class:

/**
* All verbs must extend this class, defining its two abstract methods:
* openPorts, and execute.
**/


Packet p = create(Object o);


Packet p = create(Packet.type (int) t, String s);

drop(Packet p); // Note this change!

longWaitStart(double interval); // in seconds
longWaitEnd();



/** 3 stack methods - as of JavaFBP-2.3
**/

push (Packet p);

Packet p = pop(); // return null if empty

int stackSize();


InputPort interface:

Packet = receive();

void close();


OutputPort class:

void send(Packet packet);

boolean isConnected(); // as of 2.2

void close();