Syntax of JavaFBP (Java
Implementation of FBP)
and Component API
Network Definitions
In my book, "Flow-Based Programming", I describe the syntax of
the network specifications of various FBP dialects that were in
existence when the book was written. JavaFBP, the Java implementation
of the FBP concepts, did not exist at that time, so this web page has
been added describing the syntax of JavaFBP network definitions.
In JavaFBP we not only code components in Java but also define the
networks as Java programs. The source code is on SourceForge under
Subversion (SVN) for the Flow-Based Programmming project: SVN for FBP .
There is also a jar file - JavaFBP jar file
- on the FBP web site.
One advantage of defining the network as executable code, as
compared with other approaches that merely list connections in a
language-independent way, is that the network can contain additional
logic. This logic then controls the way the network is defined,
rather than the way it runs.
Some may regard this as a defect, rather than as an asset, and both
views can certainly be defended, but one of the neat things it enables
us to do is to adjust multiplexing levels of various structures in the
diagram using a table of values (remember the multiplexing example in Sample DrawFlow Diagram). One merely retrieves a
value from a table for the degree of multiplexing in a particular
structure in the diagram, and this value is then used both as the index
of a loop invoking the connect statement, and as the index
for the elements of an array-type port (see below for both of these
terms).
Since the way the syntax relates to the underlying diagram may not
be all that clear, a brief description is in order. At the end of
this page, I have given an extremely
simple JavaFBP component.
Any JavaFBP network definition starts as follows:
public class xxxxxx extends Network {
protected void define() {
|
where xxxxxx is the Network name, including the usual
imports, copyright statements, etc. Of course you will have to import
classes
for JavaFBP objects, such as Network and Component, as well as any
JavaFBP "verb"
classes you may be using.
The network definition is terminated with:
} public static void main(String[] argv) throws Exception { new xxxxxx().go(); } }
|
In between the beginning and the ending statements defining the
network, you specify a list of connections, using the following
methods, which I will refer to as "clauses":
-
component - define an instance of a component (an FBP
"process")
-
connect - define a connection
-
initialize - define a connection including an Initial
Information Packet (IIP)
-
port - define a port on a process
Every component instance must
have a unique character string
identifying it, which allows other component instances or initial
information packets (IIPs) to be attached to it via a connection.
The following method call:
component("xxxx")
returns a reference to a component instance. The first reference to
this particular component instance must specify the component class
to be executed by that occurrence. This is done by coding
component("xxxx", cccc.class)
where cccc is the name of the Java module to be
executed.
Similarly, a port is identified by a port clause, e.g.
port("xxxx").
A port may be an array-type port, in which case the port clauses
referring
to its elements have index values, as follows:
port("xxxx",n)
where "n" runs up monotonically from 0. Each element of the port array
will be connected to a different component occurrence or IIP.
A connect or initialize clause may contain the
relevant
component clauses, together with their corresponding port
clauses, embedded within it, as e.g.
connect(component("Read", ReadText.class), port("OUT"), component("Splitter1", Splitter1.class), port("IN"));
|
or the connect and component portions may be in
separate statements,
provided component precedes any connects that
reference it.
A connect contains:
-
"from" component clause
-
"from" port clause
-
"to" component clause
-
"to" port clause
Optionally a connect may have a fifth parameter: the
connection capacity, specified as an int. If this
is omitted, the default value is used: 1 for testing, or 10 for
production (this currently has to be changed by hand in the
Network.class).
If an asterisk (*) is specified for the
"from" port, this is called an "automatic output port", and indicates a
signal generated when the "from"
component instance terminates (actually the port is just closed, so no
packet has to be disposed of).
If an asterisk is specified for the "to" port,
this is called an "automatic input port", and indicates a delay
- the "to" component instance does not
start until a signal or a close is received at this port.
If *SUBEND is specified as a port name on a subnet, a packet
containing null is
emitted at this port every time the subnet deactivates, i.e. all the
contained components terminate. It doesn't have to be named in
the port
metadata. This null packet is emitted for all
activations, including the last one.
An initialize clause contains:
as e.g.
initialize(new FileReader( "c:\\com\\jpmorrsn\\eb2engine\\test\\data\\myXML3.txt"), component("Read"), port("SOURCE"));
|
However, it has been recommended that IIPs should be strings, rather
than arbitrary objects, to facilitate future graphical management
of networks.
One last point: any number of "from" ports can be connected to a
single "to" port; only one "to" port can ever be connected to a given
"from" port.
Sample Network
Let us code up a network implementing the following picture:
First list the component clauses, together with the
component classes they are to execute (assuming that component classes
have been written to execute the various nodes of the diagram), e.g.:
component("Read Masters",Read.class) component("Read Details",Read.class) component("Collate",Collate.class) component("Process Merged Stream",Proc.class) component("Write New Masters",Write.class) component("Summary & Errors",Report.class)
|
Now these component clauses may either be made into
separate
statements or they can be imbedded into the connect
statements that
follow. Here are the connections in the diagram, without imbedded component clauses:
connect(component("Read Masters"),port("OUT"),component("Collate"), port("IN",0)); // array port connect(component("Read Details"),port("OUT"),component("Collate"), port("IN",1)); // array port connect(component("Collate"),port("OUT"), component("Process Merged Stream"), port("IN")); connect(component("Process Merged Stream"),port("OUTM"), component("Write New Masters"),port("IN")); connect(component("Process Merged Stream"),port("OUTSE"), component("Summary & Errors"),port("IN"));
|
Each item in this list is a separate Java statement.
We can now add the
class designation to the first component clause referencing a
particular component occurrence, giving the following:
connect(component("Read Masters",Read.class),port("OUT"), component("Collate",Collate.class), port("IN",0)); // array port connect(component("Read Details",Read.class),port("OUT"), component("Collate"),port("IN",1)); // array port connect(component("Collate"),port("OUT"), component("Process Merged Stream",Proc.class),port("IN")); connect(component("Process Merged Stream"),port("OUTM"), component("Write New Masters",Write.class),port("IN")); connect(component("Process Merged Stream"),port("OUTSE"), component("Summary & Errors",Report.class),port("IN"));
|
Now "Read Masters" and "Read Details" use the same Java class, so we
need some way to indicate the name of the file that each is going to
read. This is done using Initial Information Packets (IIPs). In this
case they might usefully specify FileReader objects, so we need to add
two initialize clauses, as follows:
initialize(new FileReader("c:\\mastfile"), component("Read Masters"), port("SOURCE")); initialize(new FileReader("c:\\detlfile"), component("Read Details"), port("SOURCE"));
|
Note that, since both "Read" component occurrences use the same
class code,
they naturally have the same port names - of course, the ports are
attached
to different IIPs.
Remember that back-slashes have to be doubled in Java character
strings!
"Write New Masters" will have to have an IIP to specify the output
destination - perhaps:
initialize(new FileWriter("c:\\newmast"), component("Write New Masters"), port("DESTINATION"));
|
Note also that this IIP is not a destination for the Writer
- it is an object used by this component occurrence so that the
latter can figure out where to send data to.
Add the beginning and ending statements, and you're done! The
actual sequence of connect and initialize
statements is irrelevant.
Here is the final result:
public class xxxxxx extends Network {
protected void define() { connect(component("Read Masters",Read.class),port("OUT"), component("Collate",Collate.class),port("IN",0)); // array port connect(component("Read Details",Read.class),port("OUT"), component("Collate"),port("IN",1));// array port connect(component("Collate"),port("OUT"), component("Process Merged Stream",Proc.class),port("IN")); connect(component("Process Merged Stream"),port("OUTM"), component("Write New Masters",Write.class),port("IN")); connect(component("Process Merged Stream"),port("OUTSE"), component("Summary & Errors",Report.class),port("IN")); initialize(new FileReader("c:\\mastfile"), component("Read Masters"), port("SOURCE")); initialize(new FileReader("c:\\detlfile"), component("Read Details"), port("SOURCE")); initialize(new FileWriter("c:\\newmast"), component("Write New Masters"), port("DESTINATION"));
}
public static void main(String[] argv) throws Exception{ // as of JavaFBP-2.4 new xxxxxx().go(); } }
|
Alternative (Simplified) Notation (JavaFBP-2.0+)
In the latest release of JavaFBP, we have introduced a new, simplified
notation, in
addition to that shown above. In this notation connect
specifies
two character strings, and initialize specifies an object and
a character string. In both cases, the second character
string specifies a combination of component and port, with the two
parts separated by a period. Array port indices, if required, are
specified using square brackets,
e.g.
"component.port[3]"
The old port notation will still be supported, but is only really
needed
when the port index is a variable. When debugging, it will be
noted that the square bracket notation is used in trace lines, even
when it was not used in the network definition.
Component names must of course not include periods or most special
characters, but they may include blanks, numerals, hyphens and
underscores, and they must be associated with their implementing class
using a (preceding) component statement.
Here is the above network using the new notation:
public class xxxxxx extends Network {
protected void define() { component("Read Masters",Read.class); component("Read Details",Read.class); component("Collate",Collate.class); component("Process Merged Stream",Proc.class); component("Write New Masters",Write.class); component("Summary & Errors",Report.class); connect("Read Masters.OUT", "Collate.IN[0]"); connect("Read Details.OUT", "Collate.IN[1]"); connect("Collate.OUT"), "Process Merged Stream.IN"); connect("Process Merged Stream.OUTM", "Write New Masters.IN"); connect("Process Merged Stream.OUTSE", "Summary & Errors.IN"); initialize(new FileReader("c:\\mastfile"), "Read Masters.SOURCE"); initialize(new FileReader("c:\\detlfile"), "Read Details.SOURCE"); initialize(new FileWriter("c:\\newmast"), "Write New Masters.DESTINATION"); }
public static void main(String[] argv) throws Exception { // as of JavaFBP-2.4 new xxxxxx().go(); } }
|
Here is a network example showing how variable port numbers can be used
with the LoadBalance function to define an (admittedly fairly trivial)
self-balancing network. This also shows a slightly different way
of specifying the define function.
public class TestLoadBalancer {
public static void main(final String[] args) { try { new Network() { @Override protected void define() { int multiplex_factor = 10; component("generate", Generate.class); component("display", WriteToConsole.class); component("lbal", LoadBalance.class); connect("generate.OUT", "lbal.IN"); initialize("100 ", component("generate"), port("COUNT")); for (int i = 0; i < multiplex_factor; i++) { connect(component("lbal"), port("OUT", i), component("passthru" + i, Passthru.class), port("IN")); connect(component("passthru" + i), port("OUT"), "display.IN"); } } }.go(); } catch (Exception e) { System.err.println("Error:"); e.printStackTrace(); } } }
|
Network using ActorDriver Component
This network executes the "string of pearls" pattern of 100 actors
described in the soon-to-be published 2nd Edition of the book on
Flow-Based Programming. All the occurrences of Act1 and Act2 run
under the same process (ActorDriver).
import com.jpmorrsn.fbp.components.Discard; import com.jpmorrsn.fbp.components.Generate; import com.jpmorrsn.fbp.components.OutActor; import com.jpmorrsn.fbp.engine.ActorDriver; import com.jpmorrsn.fbp.engine.Network; import com.jpmorrsn.fbp.test.components.Act1; import com.jpmorrsn.fbp.test.components.Act2;
/** This network is intended for testing ActorDriver */
public class TestActorDriver extends Network {
static final String copyright = "Copyright 2007, 2008, 2010, " + "J. Paul Morrison. At your option, you may copy, " + "distribute, or make derivative works under the terms of the " + "Clarified Artistic License, " + "based on the Everything Development Company's Artistic " + "License. A document describing " + "this License may be found at " + "http://www.jpaulmorrison.com/fbp/artistic2.htm. " + "THERE IS NO WARRANTY; USE THIS PRODUCT AT YOUR OWN RISK.";
@Override protected void define() {
connect(component("Generate", Generate.class), port("OUT"), component("ActorDriver", ActorDriver.class), port("IN")); connect(component("ActorDriver"), port("OUT[0]"), component("Discard", Discard.class), port("IN")); initialize("100000", component("Generate"), port("COUNT")); Object[] classes = { "Act2", Act2.class, "Act1", Act1.class, "OutActor", OutActor.class, "Act1 -> Act2.a -> Act2.b ->Act2.c ->Act2.d ->Act2.e ->" + "Act2.f->Act2.g->Act2.h ->" + "Act2.i ->Act2.j->Act2.k->Act2.l->Act2.m->Act2.n ->" + "Act2.o ->Act2.p ->Act2.q ->Act2.r ->" + "Act2.i2 ->Act2.j2->Act2.k2->Act2.l2->Act2.m2->" + "Act2.n2 ->Act2.o2 ->Act2.p2 ->Act2.q2 ->Act2.r2 ->" + "Act2.i3 ->Act2.j3->Act2.k3->Act2.l3->Act2.m3->Act2.n3 ->" + "Act2.o3 -> Act2.p3 ->Act2.q3 ->Act2.r3 ->" + "Act2.i4 ->Act2.j4->Act2.k4->Act2.l4->Act2.m4->" + "Act2.n4 ->Act2.o4 ->Act2.p4 ->Act2.q4 ->Act2.r4 ->" + "Act2.i5->Act2.j5->Act2.k5->Act2.l5->Act2.m5->Act2.n5 ->" + "Act2.o5 ->Act2.p5 ->Act2.q5 ->Act2.r5 ->" + "Act2.i6 ->Act2.j6->Act2.k6->Act2.l6->Act2.m6->" + "Act2.n6 ->Act2.o6 ->Act2.p6 ->Act2.q6 ->Act2.r6 ->" + "Act2.i7 ->Act2.j7->Act2.k7->Act2.l7->Act2.m7->" + "Act2.n7 ->Act2.o7 ->Act2.p7 ->Act2.q7 ->Act2.r7 ->" + "Act2.i7 ->Act2.j8->Act2.k8->Act2.l8->Act2.m8->" + "Act2.n8 ->Act2.o8 ->Act2.p8 ->Act2.q8 ->Act2.r8 ->" + "Act2.i9 ->Act2.j9->Act2.k9->OutActor (0)" }; initialize(classes, component("ActorDriver"), port("ACTS"));
}
public static void main(final String[] argv) throws Exception { new TestActorDriver().go(); } }
|
Parameter, if any, follows qualifier, if any. In the case of
"fan-out" actors, the output port element number follows qualifier and
parameter.
Either qualifier or parameter can be used to make an occurrence of an
actor name unique.
Sample JavaFBP Component
A more complete description of the API is given in the next section.
This component generates a stream of 'n' IPs, where 'n' is specified
in an InitializationConnection (specified by an initialize
clause in the foregoing). Each IP just contains an arbitrary string of
characters, in
order to illustrate the concept. Of course any copyright
information included is up to the developer.
package com.jpmorrsn.fbp.components;
import com.jpmorrsn.fbp.engine.*;
/** Component to generate a stream of 'n' packets, where 'n' is * specified in an InitializationConnection. */
@OutPort(value = "OUT", description = "Generated stream", type = String.class) @ComponentDescription( "Generates stream of packets under control of a counter") @InPort(value = "COUNT", description = "Count of packets to be generated", type = String.class)
public class Generate extends Component {
static final String copyright = "Copyright .....";
OutputPort outport;
InputPort count;
@Override protected void execute() { Packet ctp = count.receive(); if (ctp == null) { return; } count.close();
String cti = (String) ctp.getContent(); cti = cti.trim(); int ct = 0; try { ct = Integer.parseInt(cti); } catch (NumberFormatException e) { e.printStackTrace(); } drop(ctp);
for (int i = 0; i < ct; i++) { int j = ct - i; Integer j2 = new Integer(j); String s = j2.toString(); if (j < 10) { s = "0" + s; } if (j < 100) { s = "0" + s; } s = s + "abc";
Packet p = create(s); outport.send(p);
}
}
/* As of JavaFBP-2.3, the information in introspect() is now covered by the metadata, which has at the same time been expanded to add descriptive information.
introspect() is no longer needed - it will be ignored if present.
public Object[] introspect() { // was 'private' - must be 'public' return new Object[] { "generates a set of Packets under control of a counter" , "OUT", "output", String.class, "lines read", "COUNT", "parameter", Integer.class, "Count of number of entities to be generated"}; }
*/ @Override
protected void openPorts() {
outport = openOutput("OUT"); count = openInput("COUNT"); } }
|
The scheduling rules for most FBP implementations are described in
the chapter of my book called Scheduling Rules.
The previous Java implementation of FBP (javaFBP-1.5.3) presents an IIP
to a component once per activation.
This has been changed as of JavaFBP-2.0
to once per
invocation. In practice this will only affect
"non-loopers" (components that
get reactivated multiple times).
There are a few minor changes to component code as of JavaFBP-2.0:
- As good programming practice, we now feel that IIP ports should
be closed after a receive has been executed, in case it is attached to
an upstream component (rather than an IIP), and that component
mistakenly sends more than one IP - this statement has accordingly been
added to the above example.
- The drop statement now takes the packet as a parameter, rather
than
being a method of Packet.
- The send is now unconditional - it either works or crashes.
- We are adding a "long wait" state to components, specifying a
timeout value in seconds. This is coded as follows:
double _timeout = 2; // 2 secs
....
longWaitStart(_timeout);
// activity taking time goes here
longWaitEnd();
Typically, the timeout value is given a default value in the
code, and overridden (if desired) by an IIP.
While the component in question is executing the activity taking
time, its state will be set to "long wait". If one or more components
are in "long wait" state while all other components are suspended or
not started, this situation is not treated as a deadlock. However, if
one of the components exceeds its timeout value, an error will be
reported (Complain).
and a major change - metadata, as shown in the above component.
This works as follows:
- Input and output port names will be coded on components using
Java
5.0 "attribute" notation. This metadata can be used to do
analysis of networks without having to actually execute the components
involved. Here is an example of the attributes for the "Collate"
component:
@OutPort("OUT")
@InPorts({
@InPort("CTLFIELDS"),
@InPort(value = "IN", arrayPort = true)
})
public class Collate extends Component {
Note that, as Java metadata does not (as far as I know) support
multiple entries with the same name, we have provided the additional
metadata terms @InPorts and @OutPorts. When only one is needed (as for
@Outport in this example) the "plural" term can be omitted.
As shown above for "CTLFIELDS", when no attributes are needed
within an @InPort or @OutPort statement, the short notation (no "value"
clause) can be used.
Input ports do not necessarily have to be connected, even though
attributes are specified for them; output ports, however, must be.
MustRun is also specified as metadata, rather
than as an interface, as it was in version 1.5.3, i.e.
@MustRun
A new service has been added for component code as of JavaFBP-2.2:
- <output port name>.isConnected returns boolean
To support isConnected, a new metadata attribute called optional
has been added to @OutPort, e.g.
JavaFBP Component API
Component Metadata:
Note: when "value" is the only parameter, the "short" form (see above) can be used
@ComponentDescription parameters: - value (String)
@InPort parameters: - value (String) - arrayPort (boolean) - description (String) - type (class)
@OutPort parameters: - value (String) - arrayPort (boolean) - description (String) - type (class) - optional (boolean)
@InPorts parameter: list of @InPort references, e.g. @InPorts( { @InPort("IN"), @InPort("TEST") })
@OutPorts parameter: list of @OutPort references, e.g. @OutPorts( { @OutPort("ACC"), @OutPort("REJ") })
@MustRun
@Priority(Thread.MAX_PRIORITY) // default is NORM_PRIORITY
|
Packet class:
/** * A Packet may either contain an Object, when type is NORMAL, * or a String, when type is not NORMAL. The latter case * is used for things like open and close brackets (where the * String will be the name of a group. e.g. accounts) **/
Object getAttribute(String key); /* key accesses a specific attribute */ Object getContent(); /* returns null if type <> NORMAL */
|
Component class:
/** * All verbs must extend this class, defining its two abstract methods: * openPorts, and execute. **/
Packet p = create(Object o);
Packet p = create(Packet.type (int) t, String s);
drop(Packet p); // Note this change!
longWaitStart(double interval); // in seconds longWaitEnd();
/** 3 stack methods - as of JavaFBP-2.3 **/
push (Packet p);
Packet p = pop(); // return null if empty
int stackSize();
|
InputPort interface:
Packet = receive();
void close();
|
OutputPort class:
void send(Packet packet);
boolean isConnected(); // as of 2.2
void close();
|