Wednesday, December 23, 2015

GenRouter

In my last post, I briefly mentioned a new Elixir module named GenRouter. José Valim described its rationale in his Elixir Conf 2015 (US) keynote address. The project is in development and can be seen on GitHub. As with all Elixir core code, it is superbly documented - even before it has been fully implemented - although they do not seem to have settled on a naming convention. For example, the terms source and sink are used for what FBP calls in and out ports.

The name GenRouter was chosen to reflect the idea that it is a generic module  similar to the the other Elixir generic libraries: GenServer and GenEvent. Here are the opening paragraphs from the current documentation:
A behaviour module for routing events from multiple sources to multiple sinks. 
GenRouter allows developers to receive events from multiple sources and/or send events to multiple sinks. The relationship between sources (the one sending messages) and sinks (the one receiving them) is established when the sink explicitly asks for data. 
Since the same process can send and receive data, the same process can be both a source and sink, which is quite common. Sources that only send data and sinks that only receive data are called “definite source” and “definite sink”, respectively.
In his talk José acknowledged talking with the Akka team. Akka is a member of the Reactive Stream Initiative, which among other things has released a specification on how well-behaved processes handling streams of data are to act, especially with regard to "back pressure". I've mentioned Reactive Streams in early posts; its specification has also influenced my design. Basically, there is two-way communication between entities that are sending and/or receiving data steams. Demand requests are sent upstream and the fulfillment of these demand requests are sent downstream. Hence, nothing moves unless there is a demand for something.

The Reactive Streams influence on GenRouter is manifested in a module named GenRouter.Spec. In the module there are four "convenience" functions that one can use to:
  1. subscribe() to an upstream source, asking for some number of data events to be sent. So the Subscriber is asking a Publisher for a Subscription and for a certain maximum number of events (data). The Subscriber must honor the parameters of this Subscription
  2. ask() for data from an upstream source, specifying how many data events an entity can receive.
  3. route() is used to send data to a downstream entity. The Subscriber sends data to a subscribed Publisher.
  4. unsubscribe() will cancel the source/sink relationship. The Subscriber will attempt to cancel the Subscription that it has with a Publisher.
As of this date GenRouter needs to be "fleshed out".

Monday, October 19, 2015

Subscription Redux

ElixirFBP Subscription

An ElixirFBP Subscription is the "glue" that ties together all of the components in a flow graph. While Components could certainly send messages directly to each other, I found a compelling reason for developing a Subscription as a result of reading about Reactive Streams. After some fits and starts, I have developed a Subscription which seems to be able to run in either a "pull" or "push" mode. This code has a slight smell - I might be trying to do too much with this one module. It may make more sense to have two different Subscriptions - for push and pull mode, or, alternatively, develop a macro that generates a different Subscription depending on the run mode value.

In pull mode a flow typically begins with the last component(s) requesting some number of information packets. This request propagates upstream through the flow graph until eventually reaching component(s) that can provide data. Running in pull mode also allows a component (actually the subscription) to apply "back pressure" to the upstream components. Finally, a component won't respond to a request until it has all the values it needs available at its in ports.

In push mode a flow begins with the arrival of information packets at - usually - the upstream components.  When a component has received all of the data that it requires, it computes a result and sends it to its out port.

If a component is meant to work in both a push and pull mode then its listening loop must account for different sets of messages. For example, here's an Add component:

defmodule Math.Add do
  @moduledoc """
  This module describes an FBP Component: Math.Add
  It is can operate in both a push and pull mode
  """
  @behaviour ElixirFBP.Behaviour

  def description, do: "Add two integers"
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]

  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    receive do
      {:addend, value} when is_number(augend)->
        send(outports[:sum], {:sum, value + augend})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
      {:augend, value} when is_number(addend) ->
        send(outports[:sum], {:sum, addend + value})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      :sum when is_number(addend) and is_number(augend) ->
        send(outports[:sum], {:sum, addend + augend})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
    end
  end
end

The last receive clause is used when the flow is operating in pull mode. This message (:sum) originates in the subscription that connects this component's out port (:sum) with the in port to another component. Note that the destination of the send to the :sum out port is a Subscription
process; these processes are all started when a ElixirFBP.Network.start function is executed.

As I mentioned earlier in this blog, I would like to eventually develop an Elixir macro that would generate all these receive clauses based on a more declarative description of the component's in and out ports and computation.

A Refactoring

I refactored my ElixirFBP project - removing all of the Runtime support for the Noflo-ui network protocol. I will develop another project that uses ElixirFBP as a service and re-implement support for this network protocol. This essentially means using Cowboy to manage websocket requests and turn them into calls to the ElixirFBP gen servers. The newly refactored project code is available at my GitHub repository.

GenRouter!

I watched José Valim's keynote talk at the recently completed Elixir Conf 2015 (US). I learned that we might expect a new gen behaviour named GenRouter to appear in Elixir 1.3. Early code is available on Github. At first glance, it would appear that GenRouter might server very nicely as a base for FBP. I will follow the design and development closely and, at some point, see whether I can move ElixirFBP on top of it.

Friday, August 21, 2015

"Do a Something!"

I have been spending a great deal of time worrying about possible overruns in an ElixirFBP flow. Overruns are possible when the producer of Information Packets (IPs) sends them to a component that can not process them quickly enough and, as a result, all kinds of terrible things can happen. Even though Erlang/Elixir is designed from the ground up to be a message passing system, there is the very real possibility of a process's mailbox of messages becoming full.

This possibility lead me to read (and blog) about Reactive Streams and I have attempted to implement the design outlined in the Reactive Stream specification. What has bothered me about this approach is that it adds an extra processing step between any two flow components. This is the Subscription that I've described earlier. Adding a step is. of course, going to slow things down - it would be much nicer not to have this extra processing time added on. But, it doesn't appear that this is possible; something has to be monitoring the size of component's mailbox.

As part of my research into solutions to this problem, I spent the last couple of days watching a series of presentations given at the 2013 Erlang conference. The presentations were part of a track entitled: Load Regulation and Back Pressure. Ulf Wiger's 'Jobs' Load Regulation Framework was very interesting. Also, Bryan Fink's Riak Pipe.

What now? Well, I know that there are potential problems, especially if ElixirFBP ever is used in an environment such as real-time monitoring. And, I also know that there are solutions out there. But, I haven't been making much progress in the last few weeks. I think I might be suffering from Analysis Paralysis. As an old boss of mine once told me: "Do a Something!". So I will.

Friday, July 17, 2015

The correct Subscription?

In the last post, I showed a possible implementation for a Subscription - the entity that sits between a Publisher and a Subscriber in a Reactive Stream. It wasn't a very good implementation; it is lacking in many respects not the least of which is that it couldn't deal with an FBP Component executing in multiple Elixir/Erlang processes. ElixirFBP is meant to exploit the relatively cheap cost of spawning many processes in Erlang/Elixir. I could have used one Subscription per component process but that would have placed the burden of determining which process to send a message to with the Component. I would like to keep the Components as simple as possible. Of course, there is still the possibility that a Subscription process will become a "choke" point in a flow.

Below is the heart of a new version of the Subscription module: the loop function:
    def loop(%Subscription{
                publisher_pids: publisher_pids,
                publisher_port: publisher_port,
                subscriber_pids: subscriber_pids,
                subscriber_pids_length: subscriber_pids_length,
                subscriber_port: subscriber_port} = subscription,
                subscriber_index) do
      receive do
        {:pull, n}  when n > 0 ->
          # A request for data from the subscriber
          # Ask all the publishers for a data value
          Stream.cycle(publisher_pids)
            |> Stream.take(n)
            |> Enum.each(fn(publisher_pid) ->
              send(publisher_pid, {publisher_port, self()})
            end)
          loop(subscription, 0)
        {^publisher_port, value} ->
          # Publisher has sent data - pass it on to the next subscriber
          send(elem(subscriber_pids, subscriber_index), {subscriber_port, value})
          loop(subscription, rem(subscriber_index + 1, subscriber_pids_length))
        message ->
          Logger.info("Received unknown message: #{inspect message}")
          loop(subscription, subscriber_index)
      end
    end
Notice that when a pull request is received, all of the processes that are currently executing the Publisher component are sent a request. When the responses to these requests begin arriving back, each response is sent to one of the Subscriber component processes in a round-robin fashion. Unless otherwise specified, the default number of processes spawned per component is one.

There is still a problem with this design: the responses are not necessarily received in the same order as the requests that were sent out. There are many examples where the order of the Information Packets is important. I will address this shortcoming a little later.

How does this all work? At the lowest level, these are the steps required:
  1. Spawn one or more processes for every Component in the FBP Graph.
  2. Spawn a Subscription process for every edge between two Components.
  3. Send pull requests to the Subscription processes.
These steps are illustrated in the code below:
    iip1 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 44])
    iip2 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 33])
    add = Component.start(Trial.Add)
    show = Component.start(Trial.Show)
    sub1_pid = Subscription.start(iip1, :value, add, :addend)
    sub2_pid = Subscription.start(iip2, :value, add, :augend)
    sub3_pid = Subscription.start(add, :sum, show, :in)
    send(sub3_pid, {:pull, 1})
    send(sub2_pid, {:pull, 1})
    send(sub1_pid, {:pull, 1})
An InitialInformationPacket is a component that always responds with a constant value. In the example, the Add component is sent the constant values of 44 and 33.

I now need to introduce Subscriptions to the existing code base and make it all work - as it once did - with the NoFlo Development Environment.

Wednesday, July 8, 2015

Reactive Streams, continued.

In the last post, I explained that I wanted ElixirFBP to exhibit Reactive Stream behavior, namely that a component could apply "back pressure" to its upstream component so that it didn't suffer from buffer overflows and similar errors. Data still flows downstream but how that flow is initiated is different: Now a component does not send an Information Packet (IP) to its downstream connection when that IP is ready - for instance, when a Sum component receives both of its inputs. Rather, it sends a value only upon being asked for it.

Recall, that components no longer connect directly with each other. Instead, they are connected via a Subscription. The Subscription knows about a Publisher and a Subscriber and is responsible for moving IP's to where they belong and for maintaining control over how many of these IP's are sent. Publishers and Subscribers are simply Components. Below is a possible implementation of a Subscription:
defmodule Subscription do
  require Logger

  defstruct [
    publisher_pid: nil, publisher_port: nil,
    subscriber_pid: nil, subscriber_port: nil
  ]
  def start(%Subscription{} = subscription) do
    spawn(fn -> loop(subscription,  0) end)
  end
  def loop(%Subscription{
              publisher_pid: publisher_pid,
              publisher_port: publisher_port,
              subscriber_pid: subscriber_pid,
              subscriber_port: subscriber_port} = subscription, count) do
    receive do
      {:pull, n}  when n > 0 ->
        # A request for data from the subscriber
        # Ask the publisher for a data value
        send(publisher_pid, {publisher_port, subscription})
        loop(subscription, count + n)
      {publisher_port, value} when count > 0 ->
        # Publisher has sent data - pass it on to the subscriber
        send(subscriber_pid, {subscriber_port, value})
        # Ask the publisher for more data
        send(publisher_pid, {publisher_port, subscription})
        loop(subscription, count - 1)
      {publisher_port, value} when count == 0 ->
        # Unrequested data from the publisher. Drop it?
        loop(subscription, 0)
      {:complete} ->
        # From the publisher TODO: Now what?
        loop(subscription, 0)
      {:error, message} ->
        Logger.error("Error received #{inspect message}")
        loop(subscription, 0)
      message ->
        Logger.info("Received unknown message: #{inspect message}")
        loop(subscription, count)
    end
  end
end
A subscription is started as a separate Elixir process and immediately begins listening for messages from its publisher and subscriber. Components need to be rewritten because they are no longer pushing data. Here's what a rewritten Add component would look like:
defmodule Add do
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]
  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    %{:sum => sum} = outports
    receive do
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      {:sum, subscription} when is_number(addend) and is_number(augend) ->
        sum = addend + augend
        outports = %{outports | :sum => sum}
        Component.send_ip(subscription, sum)
        loop(inports, outports)
    end
  end
end
Notice how the component waits for a message specifically requesting its calculated value and for that value to be sent to the subscription rather than directly to a downstream component.

Typically, one sees Reactive Streams used in dataflows that are built via functional composition. Importantly, the composition is evaluated lazily. That is, data does not begin to flow until the last function is executed. In Apache Spark these terminal functions are called actions. In effect, the actions begin pulling the data through the flow. This is also, exactly the behavior exhibited by Elixir Streams.

Thursday, July 2, 2015

Reactive Streams

It's been two weeks since my last post during which time I have embarked on a major internal design change. Early on, I said that I wanted ElixirFBP to more closely follow the so-called "classical" Flow Based Programming model - Paul Morrison is the source for this term. He discusses the differences between his implementations of FBP and that of NoFlo. One difference is that classical FBP components can exhibit back pressure on their upstream components. What this means is that a component can tell a component connected to one of its in ports: "Send no more data!" This ability is crucial when connected components can all be running at different speeds; a fast component could overwhelm a slower component leading to buffer overruns or out of memory errors.

This problem has become especially critical in the era of "Big Data" and the processing flows/streams that are being built to handle these massive amounts of data. The desire to have these data streams handled by asynchronous processes (for concurrency reasons) compounds the problem. Recently a group - Reactive Streams - was formed to study this problem. The result is a specification that describes the behavior of asynchronous processors that can apply non-blocking back pressure in the face of fast streams of data. The people at Akka contributed to the writing of the specification; they have a good description of how all this works in the context of their Akka Streams. I also recommend watching Roland Kuhn's presentation.

The Reactive Streams specification is very straightforward and easy to understand. There are Publishers and Subscribers. Subscribers ask for a Subscription from a Publisher. The Subscription is used by the Subscriber to tell the Publisher that it can only receive a certain number of packets of data. This constraint can change during processing as the dynamics of the overall graph change.

I have begun changing ElixirFBP so that any two connected components no longer speak directly with each other (exchange messages) rather, they speak to a Subscription. In true Erlang/Elixir fashion the Subscriptions are separate processes. The components need to be written a little differently now. Here, for example, is the new version of the Add component:
  defmodule Add do
    def inports, do: [addend: :integer, augend: :integer]
    def outports, do: [sum: :integer]
    def loop(inports, outports) do
      %{:augend => augend, :addend => addend} = inports
      %{:sum => sum} = outports
      receive do
        {:addend, value} ->
          inports = %{inports | :addend => value}
          loop(inports, outports)
        {:augend, value} ->
          inports = %{inports | :augend => value}
          loop(inports, outports)
        {:sum, subscription} when is_number(addend) and is_number(augend) ->
          sum = addend + augend
          outports = %{outports | :sum => sum}
          Component.send_ip(subscription, sum)
          loop(inports, outports)
        message ->
          loop(inports, outports)
      end
    end
  end
You don't see any indication of back pressure being applied - this is all handled in the Subscription that is associated with each connected in and out port. Subscriptions are created during the adding of an edge to the network graph.

Notice also that a downstream component must ask for a value (the sum) from this component rather than passively receiving it when the value has been computed. This is an example of "pulling" data rather than "pushing" data and thus exhibits "lazy" behavior. Elixir Streams operate this way and so does Apache Streams.

None of this code is available at my GitHub repository. I hope to push up all these changes next week.

Sunday, June 21, 2015

On a component's behaviour

I have been busy implementing more of the commands that are part of the FBP Network protocol which is described here. If an FBP runtime speaks this protocol then one is able to use the neat development environment that has been developed by the NoFlo folks. You can run their interface locally, as a node.js program or you can use the online version: app.flowhub.io. I've been able to use the online version to create a small graph that I can then execute.

The current implementation of ElixirFBP has its components hard-wired into the runtime. This is, of course, temporary. I eventually want the runtime to be able to find and load components dynamically. NoFlo has the idea of a ComponentLoader for their node.js implementation:
"Node.js version of the Component Loader finds components and graphs by traversing the NPM dependency tree from a given root directory on the file system."
Any component-based framework, such as ElixirFBP, becomes generally useful if it is possible to construct components that can be "plugged" into the framework so long as the components obey rules as defined by an API. In other words, the component must behave properly. Elixir and Erlang have the concept of a behaviour, basically the specification of an api that a Elixir/Erlang module must implement. Let's see how this might possibly be useful.

A typical ElixirFBP component, Math.Add looks like the following:
defmodule Math.Add do
  def description, do: "Add two integers"
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]

  def loop(augend, addend, sum) do
    receive do
      {:augend, value} when addend != nil ->
        sum = ElixirFBP.Component.send_ip(sum, addend + value)
        loop(nil, nil, sum)
      {:augend, value} ->
        loop(value, addend, sum)
      {:addend, value} when augend != nil ->
        sum = ElixirFBP.Component.send_ip(sum, value + augend)
        loop(nil, nil, sum)
      {:addend, value} ->
        loop(augend, value, sum)
    end
  end
end

Notice the four functions definitions: description, inports, outports, and loop. With the exception of the description function, they must all be present for this component to operate correctly. In the process of building an FBP graph and connecting components, ElixirFBP, given the module name of a component, can retrieve the inports and outports of a component by executing the following code:
    component_inports = elem(Code.eval_string(component <> ".inports"), 0)
    component_outports = elem(Code.eval_string(component <> ".outports"), 0)
Eventually, this component is started by spawning a process:
process_pid = spawn(module, :loop, inport_args ++ outport_args)
These three functions. at least at this point in ElixirFBP's development, define the API for an ElixirFBP component or, in Elixir/Erlang terms, they describe the behaviour of the component. In Elixir, a behaviour is specified using the Behaviour macros. Here is what the ElxirFBP Component behaviour might look like:
defmodule Component do
  use Behaviour

  defcallback inports() :: [ElixirAtom: ElixirAtom]
  defcallback outports() :: [ElixirAtom: ElixirAtom]
end
A component must implement two functions, inports() and outports() that both will return a list of tuples. In Elixir, if the tuples are structured a certain way, the list can be treated as a Keyword list.

It was at this point that I realized that the way I have been defining the loop function of a component was not general enough. If you look at the Math.Add component above, you'll see that the loop function takes three arguments. I could define a loop behaviour callback that accepts three arguments but there are two problems: Which arguments are for the in ports and which are for the out ports? And, of course, a component can have any number of in and out ports. A solution is to generalize the loop function to accept two maps: one for the in ports and one for the out ports. Its behaviour could look like:
  defcallback loop(%{}, %{}) :: any
The component's loop function must have two arguments both of which are Elixir Maps. This change means that the way an ElixirFBP component is written needs to change. Here's what Math.Add looks like when rewritten to conform to the Component behaviour:
defmodule Math.Add do
  def description, do: "Add two integers"
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]

  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    %{:sum => sum} = outports
    receive do
      {:augend, value} when not is_nil(addend) ->
        sum = ElixiFBP.Component.send_ip(sum, addend + value)
        outports = %{outports | :sum => sum}
        inports = %{inports | :augend => nil, :addend => nil}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      {:addend, value} when not is_nil(augend) ->
        sum = ElixiFBP.Component.send_ip(sum, value + augend)
        outports = %{outports | :sum => sum}
        inports = %{inports | :augend => nil, :addend => nil}
        loop(inports, outports)
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
    end
  end
end
I'll explain what's going on in this new version of Math.Add in the next post.

Monday, June 15, 2015

A couple of things...

A couple of things came to mind as I read the messages at the flow-based programming group and a discussion at Hacker News. I learned of the HN conversation via an interesting blog post by Samuel Lampa. Among other things, Samuel points out some of the problems associated with what is called "back pressure" in a flow- or stream-based system. This is where the receiver of information becomes overwhelmed and needs to tell the information source that "that's enough, for now" before running out of memory or some other kind of nasty thing occurs.

This got me to thinking about what I'm trying to accomplish with ElixirFBP. It is an experiment to see what a flow-based system written Elixir behaves like and how it can best be designed to offer the user some compelling reason to use such a system - other that it's written in some new language. I think the reason will be the overall speed of computation. This, despite some inherent speed issues with Erlang. Samuel describes how Elixir/Erlang could best be utilized as a "control plane". So, as the discussions above point out, there are plenty of issues to be faced and dealt with. That's a Good Thing! I hope this experiment will result in a base for investigating solutions to these issues.

Stay tuned as I continue the implementation of ElixirFBP support for NoFlo's network protocol. Having this goal of supporting the noflo-ui requires my having to provide a lot of support for the various intricacies of creating and running flow-based programs - something I'm not sure I would have been able to think of on my own.

Finally, it turns out that I can get my code to work with NoFlo's online version of their development environment so one doesn't have to install noflo-ui as I described in my previous post.

Sunday, June 14, 2015

Although still in its early stages, I have managed to get ElixirFBP talking with NoFlo's Development Environment. This browser-based tool allows one to build flow-based programs and execute them. Below is partial screen shot of the tool showing the graph that I built:



In the picture I have added two components; set the initial values for the Math.Add component; and connected its output to Core.Output's in port. When you press on the start button (in the upper right hand corner), the value of the addition is printed in the console window where ElixirFBP is running.

NoFlo's tool works with any server/runtime that supports the NoFlo Network Protocol. If you recall, I made the decision, early on, to model the underlying architecture of ElixirFBP along the lines of this protocol. So, it wasn't too difficult to make the tool work with ElixirFBP. I haven't implemented all of the protocol, only those commands that allow you to add and remove components, edges and initial values and start the graph.

ElixirFBP now runs as a network server that is listening on a websocket connection for these Network Protocol commands. I'm using the websocket facilities offered by the Cowboy Erlang module described here. I am also using the Elixir Poison library for encoding and decoding the JSON messages. To start ElixirFBP, be in its top level directory and enter iex -S mix at the command prompt. I use the Elixir Logger to display information as the server receives commands.

I'm still learning how to use NoFlo's tool, so the following description of how to set up NoFlo's tool may not be completely accurate. I am running a local version of the development tool rather than the one available online. This version, called the noflo-ui, is available on Github here. I followed the directions found at this page. You will need to have node.js installed.

Once noflo-ui is installed and started, you can open it up in a browser by going to http://localhost:3005/index.html. On the opening page you will see a section entitled Runtimes. You need to let noflo-ui know that the ElixirFBP runtime is available. Click the Register button and in the dialog that shows, click on the Add manually button. Enter elixir-fbp as the Runtime name and for the Type field, choose Custom. Click the Create button and Close the original dialog. The elixir-fbp runtime will appear as a black rectangle. Click the rectangle - you should start seeing activity from the ElixirFBP server. noflo-ui is asking for the list of components that the elixir-fbp has available - only two at this point. The browser will now display a drawing area upon which you can begin assembling your program.

To start creating a flow-based program, click on the area in the upper left hand corner - a noflo-ui-assigned graph name. A list of components should appear - again, only two. Clicking on a component will place it in the drawing area. From this point on, you would add other components, begin connecting out ports to in ports and establishing initial values. Right-clicking on a component provides other facilities. I would suggest that you read the documentation that is available for using NoFlo's Development Environment.

My Github repository has all the code needed for building and running a limited FBP graph using NoFlo's Development Environment. Be warned: things are probably a little fragile!

Wednesday, June 3, 2015

The current design and implementation of ElixirFBP does not take advantage of Elixir's ability to spawn many processes cheaply and quickly. There will undoubtedly be times where it would be desirable to have multiple processes supporting a single component. Right now, each component is supported by one Elixir process.

To give ElixirFBP this capability, I used the metadata part of the flow-based network protocol to specify the number of Elixir processes that you want running in support of a component. In code for creating a graph, this looks like:
    Graph.add_node(fbp_graph_reg_name, "copier", "Jsfbp.Copier",
                   %{:number_of_processes => 4})
In this example, when the FBP graph is started, four separate processes will be spawned for this component. Any component that sends an Information Packet (IP) to this component will automatically have this IP sent to one of the spawned processes, in a round-robin manner. Note that, with this method, there is no guarantee that the order of the IPs will be preserved as they move through the remainder of the flow. I will discuss techniques for ensuring order in subsequent posts.

We should expect to see some improvement in the execution speed of a graph that uses multiple processes for a component. Recall that the Elixir/Erlang virtual machine will take advantage of multiple cores in a processor automatically. Here's an experiment to see this in action: I've created an example program - Examples.Fbptestv1Multi - that uses a component to create IPs and sends them to a "faker" component which sends them on to a discarder component that simply ignores them. The faker component will sleep for a certain number of milliseconds to simulate processing. The example is written so that the faker component can be set to run in a specified number of processes (by default one).

The example program has a timing function that takes two arguments: the number of IPs to create and send and the number of processes to assign to the faker component. I ran the following tests on my four-core, AMD processor running at 3.60 Ghz processor (the time is in seconds and the faker process was set to sleep for 20 milliseconds):

No. of IPs No. of Processes Time
1,000 1 31.282
1,000 2 15.648
1,000 4 7.830
1,000 8 3.924
1,000 16 1.972
1,000 32 1.001

You can see that as the number of processes assigned to the faker component increases the overall speed of the graph execution goes down. All the code to run this example is in my github repository.  To run it, perform the following steps after downloading the ElixirFBP code:

  1. cd to the ElixirFBP directory.
  2. You may have to run mix deps.get in order to download and install certain libraries
  3. Run iex -S mix
  4. At the iex prompt enter: Examples.Fbptestv1Multi.time_it - this will, by default, run 1,000 IPs utilizing one processor for the faker component. The time will be displayed in microseconds - not milliseconds.
  5. Enter Examples.Fbptestv1Multi.time_it(1000, 4) to use 4 faker processes.
All we needed to do to utilize the inherent concurrency present in Elixir/Erlang systems was to specify the number of processes that we wanted a component to be run in. The changes that I had to make to ElixirFBP were to simply create a tuple of  the registered names of the spawned processes and maintain an index that points to the next process to use - to send an IP to. The only other change was to how the components are written. Now, all sends return an updated value for the target out port. So,
def loop(in_port, out) do
    receive do
      {:IN, value} ->
        # Do some processing
        out = Component.send_ip(out, value)
        loop(value, out)
    end
  end
became
def loop(in_port, out) do
    receive do
      {:IN, value} ->
        # Do some processing 
        out = Component.send_ip(out, value)
        loop(value, out)
    end
  end

Wednesday, May 27, 2015

In the Elixir implementation of FBP, the components and the graph that represents the data flow are each running as a separate process - Erlang process; that's the Elixir/Erlang way of doing things. All communication among these processes is via the sending of messages - Information Packets (IPs) in the flow-based terminology. If you are curious, you can see these processes using the Erlang observer tool. Try the following with the current version - as of today - of ElixirFBP:

  1. Using a command window, cd to the ElixirFBP directory
  2. Type iex -S mix 
  3. At the iex prompt, enter :observer.start This will open a window onto the Erlang virtual machine. 
  4. Click on the Processes tab. You'll see a list of all the processes currently executing.
  5. Back at the iex prompt, enter Examples.Citibike.start to start the ElixirFBP version of the Streamtools Citibike example. 
  6. Go back to the observer window and click on the "Name or Initial Func." column. This will sort the processes by their name. You should see something like the following:

  7. All the processes associated with the example start with "citibike".
  8. If you go back to the iex prompt again and enter ElixirFBP.Network.stop  and then enter ElixirFBP.Network.stop_network you will see the "citibike" processes disappear as they forced to stop.

Monday, May 25, 2015

The point of my work is not to re-invent Flow-Based Programming nor to re-invent visual programming (for a really good list of visual programming languages, visit this website). Rather, I want to explore how one can take advantage of the inherent concurrency of Elixir/Erlang processes. Impedance mismatch, when used in software development, is how hard it is to make something behave like something else. There seems to be less of an impedance mismatch with the idea of implementing flow-based or streaming systems in Elixir.

ElixirFBP is showing some life! Last week I was able to create a small graph consisting of an adder and an output component. This is what the code looks like:
defmodule Examples.Add1 do
  alias ElixirFBP.Graph
  alias ElixirFBP.Network

  @graph_1      "graph_n1"
  @node_1       "node_1"
  @node_2       "node_2"

  def start do
    {:ok, fbp_graph_reg_name} = Graph.start_link(@graph_1)
    # Add the components to the graph
    Graph.add_node(fbp_graph_reg_name, @node_1, "Math.Add")
    Graph.add_node(fbp_graph_reg_name, @node_2, "Core.Output")
    # Connect the components
    Graph.add_edge(fbp_graph_reg_name, @node_1, :sum, @node_2, :in_port)
    # Set the initial values
    Graph.add_initial(fbp_graph_reg_name, 42, @node_1, :addend)
    Graph.add_initial(fbp_graph_reg_name, 24, @node_1, :augend)
    # Start the flow
    {:ok, _fbp_network_pid} =
        Network.start_link(fbp_graph_reg_name)
    Network.start()
  end
end
The add component is initialized with the two values: 42 and 24. The output of this component is fed into a component that simply prints its message. Granted, this is not the way one would do a little bit of arithmetic but it does test many of the ElixirFBP design elements. For a more interesting demo, please read on.

I recently gave a presentation of my ideas at a Meetup of the Portland, ME Elixir group. I got some great feedback and pointers to three systems that were worth looking at:
  1. Apache Storm
  2. Apache Spark
  3. Streamtools.
Streamtools is being developed by the R&D group at the New York Times. They have designed a flow-based system that features a graphical front-end - similar to NoFlo's - and a backend that executes components written in the Go language. They have some interesting demos - more than the addition of two numbers! One, a poller, periodically checks the status of a bike share program in New York City. I decided to try and implement this demo using ElixirFBP. Here's what it looks like:
defmodule Examples.Citibike do
  alias ElixirFBP.Graph
  alias ElixirFBP.Network

  @graph_1      "citibike"
  @node_1       "ticker"
  @node_2       "map"
  @node_3       "getHTTP"
  @node_4       "unpack"
  @node_5       "filter"
  @node_6       "output"

  def start do
    {:ok, fbp_graph_reg_name} = Graph.start_link(@graph_1)
    # Add the components
    Graph.add_node(fbp_graph_reg_name, @node_1, "Streamtools.Ticker")
    Graph.add_node(fbp_graph_reg_name, @node_2, "Streamtools.Map")
    Graph.add_node(fbp_graph_reg_name, @node_3, "Streamtools.GetHTTPJSON")
    Graph.add_node(fbp_graph_reg_name, @node_4, "Streamtools.Unpack")
    Graph.add_node(fbp_graph_reg_name, @node_5, "Streamtools.Filter")
    Graph.add_node(fbp_graph_reg_name, @node_6, "Core.Output")
    # Connect the components
    Graph.add_edge(fbp_graph_reg_name, @node_1, :out, @node_2, :in_port)
    Graph.add_edge(fbp_graph_reg_name, @node_2, :out, @node_3, :path)
    Graph.add_edge(fbp_graph_reg_name, @node_3, :out, @node_4, :in_port)
    Graph.add_edge(fbp_graph_reg_name, @node_4, :out, @node_5, :in_port)
    Graph.add_edge(fbp_graph_reg_name, @node_5, :out, @node_6, :in_port)
    # Set initial values
    Graph.add_initial(fbp_graph_reg_name, 10_000, @node_1, :interval)
    Graph.add_initial(fbp_graph_reg_name,
                      "http://www.citibikenyc.com/stations/json",
                      @node_2, :map)
    Graph.add_initial(fbp_graph_reg_name, "stationBeanList", @node_4, :part)
    Graph.add_initial(fbp_graph_reg_name, "stationName", @node_5, :filter)
    Graph.add_initial(fbp_graph_reg_name, "W 41 St & 8 Ave", @node_5, :filter_value)
    # Start the flow
    {:ok, _fbp_network_pid} =
        Network.start_link(fbp_graph_reg_name)
    Network.start()
  end
end
To run this you need to have Erlang and Elixir installed - see here. Then:
  1. Download the ElixirFBP code from github.
  2. Open a command window at the ElixirFBP directory level.
  3. You may have to perform a mix deps.get first in order to load some libraries that I use.
  4. Run iex -S mix 
  5. Type the following at the iex prompt: Examples.Citibike.start
  6. Wait about 10 seconds. You should start seeing messages appearing roughly every 10 seconds; the messages will likely be the same, unless someone has rented a bike in the last 10 seconds!
  7. Stop the execution by pressing Ctrl+c, twice. 
  8. You can run the Add example by entering  Examples.Add1.start
The code is still a little rough. Note that the Streamtools flow components are JSON-oriented. In my implementation, once a JSON response is received, it is converted to Elixir data - maps and lists. They also feed the final result into a mask component that then connects via web sockets to a browser. I did not implement this step in the flow.

Soon I'll start connecting NoFlo's graphical front end to ElixirFBP. Then you'll be able to create and run these demos using their GUI.

Friday, May 15, 2015

The addition of the add_initial graph function triggered several changes in the type of information being stored in the internal - Erlang - graph. Here are the external and internal implementations of add_initial (recall that the graph module is a GenServer; typically functions are written in pairs):
  def add_initial(graph_id, src, tgt, metadata \\ %{}) do
    GenServer.call(graph_id, {:add_initial, src, tgt, metadata})
  end
  def handle_call({:add_initial, src, tgt, metadata}, _req, fbp_graph) do
    src_data = src.data
    node_id = tgt.node_id
    port = tgt.port
    {node_id, label} = :digraph.vertex(fbp_graph.graph, node_id)
    inports = label.inports
    new_inports = Keyword.put(inports, port, src_data)
    new_label = %{label | :inports => new_inports}
    :digraph.add_vertex(fbp_graph.graph, node_id, new_label)
    {:reply, src_data, fbp_graph}
  end
Notice that the initial data is associated with a target port. When the graph is eventually executed, these values will be sent, as a message, to the target process. The target node will be an Elixir process at this point and will be listening for messages coming in on its input ports.

If the nodes are Elixir processes, what will they look like? We can borrow some ideas from the implementations of FBP components in other programming languages. Below is the CoffeeScript version of the Math.Add component that I retrieved from one of NoFlo's component libraries:
class Add extends noflo.Component
  constructor: ->
    @augend = null
    @addend = null
    @inPorts =
      augend: new noflo.Port
      addend: new noflo.Port
    @outPorts =
      sum: new noflo.Port

    @inPorts.augend.on 'data', (data) =>
      @augend = data
      do @add unless @addend is null
    @inPorts.addend.on 'data', (data) =>
      @addend = data
      do @add unless @augend is null

  add: ->
    @outPorts.sum.send @augend + @addend
    @outPorts.sum.disconnect()
Something very similar could be written in Elixir:
defmodule Math.Add do

  def inports, do: [{:addend, nil}, {:augend, nil}]
  def outports, do: [{:sum, nil}]

  def loop(augend, addend, sum) do
    receive do
      {:augend, value} when addend != nil ->
        send sum, addend + augend
        loop(nil, nil, sum)
      {:augend, value} -> 
        loop(value, addend, sum)
      {:addend, value} when augend != nil ->
        send sum, addend + augend
        loop(nil, nil, sum)
      {:addend, value} -> 
        loop(augend, value, sum)
    end
  end
end
This is neither a complete nor a final version of what an Elixir-based component would look like. Earlier, I mentioned that I would like to employ Elixir's macro facilities and create an FBP DSL.

There is now enough information being stored in the graph so that I will be able to begin trying out some designs for the execution of the graph. I hope to show some of my early attempts in the next post. In the meantime, try out some of the code, if you like.

Wednesday, May 13, 2015

As I get deeper into an understanding of how the FBP graph protocol operates, I uncover areas in my code that are deficient; this is, of course, inevitable. For example, the original design for storing FBP graph nodes couldn't deal with adding an initial value to a node input port. To do this, it was necessary to add placeholders in the graph node for these ports. Recall that Erlang's digraph facilities store nodes (vertices) with an identifier value and a label value. The label value can be used to store extra information about the node - such as FBP ports. Whereas, before the following code was used to store an FBP node in the graph:
:digraph.add_vertex(fbp_graph.graph, node_id, [component, metadata])
it now looks like this:
  label = %{component: component, inports: inports, outports: outports, metadata: metadata}
  new_node = :digraph.add_vertex(fbp_graph.graph, node_id, label)
Where do the value for inports and outports come from? They come from the component itself. Right now, the component is identified by its module name - a string, for example, "Math.Add". To support the retreival of a component's ports, a very early design for an Elixir component looks like the following:
defmodule Math.Add do 
  @moduledoc """
  This module describes an FBP Component: Math.Add
  It knows its input and outport ports and how to execute a function.
  Ports are described with tuples: {name, initial value}
  """
  def inports, do: [{:addend, nil}, {:augend, nil}]
  def outports, do: [{:sum, nil}]
  def execute do
  end
end
With this in place, the port definitions can be retrieved at run-time by executing the following statements in the add_node function:
    inports = elem(Code.eval_string(component <> ".inports"), 0)
    outports = elem(Code.eval_string(component <> ".outports"), 0)
I have a feeling that this design for an Elixir component will not last too long... But, for the time being, it works. The complete implementation of the new add_node callback is:
def handle_call({:add_node, graph_id, node_id, component, metadata}, _req, fbp_graph) do
    inports = elem(Code.eval_string(component <> ".inports"), 0)
    outports = elem(Code.eval_string(component <> ".outports"), 0)
    label = %{component: component, 
              inports: inports, outports: outports, 
              metadata: metadata}
    new_node = :digraph.add_vertex(fbp_graph.graph, node_id, label)
    {:reply, new_node, fbp_graph}
  end
Similar changes needed to be made to the add_edge callback:
  def handle_call({:add_edge, graph_id, src, tgt, metadata}, _req, fbp_graph) do
    label = %{src_port: src.port, tgt_port: tgt.port, metadata: metadata}
    new_edge = :digraph.add_edge(
                    fbp_graph.graph,
                    src.node_id,
                    tgt.node_id,
                    label)
    {:reply, new_edge, fbp_graph}
  end
I'll show add_initial, the function that triggered all these changes, in the next post. All the rest of the code and its tests can be found at github.

Monday, May 11, 2015

In this post, I'll continue the development of the ElixirFBP Graph process. I'll show functions to add and remove a node; and add an edge. Again, we are guided by the NoFlo Network Protocol.  You can see the code and tests that I've developed so far in GitHub. I expect that these functions will change as I get a better understanding of how FBP will work in an Elixir environment.

Adding a node is straight-forward; the external API and the callback implementations are:
  def add_node(graph_id, node_id, component, metadata \\ %{}) do
    GenServer.call(__MODULE__, {:add_node, graph_id, node_id, component, metadata})
  end
  def handle_call({:add_node, graph_id, node_id, component, metadata}, _requester, fbp_graph) do
    new_node = :digraph.add_vertex(fbp_graph.graph, node_id, [component, metadata])
    {:reply, new_node, fbp_graph}
  end

Nothing overly complex going on here. Again, note that even though we are changing the graph, we do not need to pass back a new state. This is because, the Erlang ETS table that is behind the internal graph representation is, in fact, mutable.

Here is the remove_node function:
  def remove_node(graph_id, node_id) do
    GenServer.call(__MODULE__, {:remove_node, graph_id, node_id})
  end
  def handle_call({:remove_node, graph_id, node_id}, _requester, fbp_graph) do
    result = :digraph.del_vertex(fbp_graph.graph, node_id)
    {:reply, result, fbp_graph}
  end

One design aspect that remains unresolved is whether adding a node to a graph should result in the spawning of a process.  There is a subset of the FBP Network protocols that deals with what is called a network. One starts and stops networks - executes the graph. So, it is possible that this is when the process should be spawned.  It's clear that we will model FBP processes - instances of components - with Elixir processes, but it is unclear, at least to me, how and when this will happen. In the next post, I hope to describe and show a solution to this problem.


Thursday, May 7, 2015

Today I code! Not a great deal - but, at least it'll be a start

I decided to begin the implementation of an Elixir FBP by developing the code which will be responsible for maintaining the internal representation of the FBP program. It's a graph whose nodes represent the FBP components and whose edges are the connections between the components over which Information Packets (IP's) pass. I'll use the Erlang package digraph for this representation. 

An FBP graph is a dynamic entity: nodes and edges can be added, changed and removed. I will model its functional interface according to NoFlo's FBP Network Protocol, in particular, the graph sub-protocol.  Further on in our implementation, this graph will be able to be accessed either programmatically, through the Network Protocol, or via a DSL. 

The graph also needs to persist.  It is a common practice in Elixir/Erlang to model a persistent entity as a separate process that maintains state and provides functionality by responding to messages. Elixir provides a GenServer behavior, which has its base in Erlang's OTP - a platform to support long-running processes along with many other facilities. 

According to the NoFlo's Network Protocol, a graph also has metadata associated with it, such as name, id, description, etc. Thus our first bit of code will be the definition of an Elixir structure to hold this metadata as well as the graph (digraph):


defmodule ElixirFBP.Graph do
  defstruct [
    id: nil,
    name: "",
    library: nil,
    main: false,
    description: "",
    digraph: nil
  ]

When a module uses (exhibits) the GenServer behavior, it automatically handles the starting and termination of the process, and message handling - both synchronously and asynchronously. Collectively these functions constitute the "callback" interface. 
But these functions do very little and are usually overridden. The Elixir FBP Graph module will also present an "external" interface - those functions that will be called by clients. Our first external function will be start_link. This function is responsible for creating the process state, and getting the process going:
  
  use GenServer
  ########################################################################
  # The External API
  @doc """
  Starts things off with the creation of the state.
  """
  def start_link(id \\ nil, name \\ "", library \\ nil,
                 main \\ false, description \\ "") do
    digraph = :digraph.new()
    fbp_graph = %ElixirFBP.Graph{id: id, name: name, library: library,
          main: main, description: description, digraph: digraph}
    GenServer.start_link(__MODULE__, fbp_graph, name: __MODULE__)
  end

Here, I create an empty Erlang digraph and add it to an instance of the Graph structure. This structure is passed to GenServer and becomes the process's state. GenServer will make this state available to all subsequent calls. The process is also registered with the name  ElixirFBP.Graph - the value of __MODULE__. The process will stay alive until there is a failure or its terminate function is called.

Elixir processes, including this GenServer process, do something in response to being sent a message. All messages are sent asynchronously, although a synchronous message can be effected by sending the process id of the caller along with the message. In GenServer terminology sending an asynchronous message is termed a cast and the sending of a synchronous message is termed a call. To make a GenServer process do somethin, you create "external" functions that call or cast information to it; the process will respond by calling back to handler functions that you provide. For example, to provide a function to clear a graph, you would code the following pair of functions:

  def clear do
    GenServer.call(__MODULE__, :clear)
  end

@doc """
  A request to clear the FBP Graph. Clearing is accomplished by
  deleting all the vertices and all the edges.
  """
  def handle_call(:clear, _requester, fbp_graph) do
    g = fbp_graph.graph
    vs = :digraph.vertices(g)
    es = :digraph.edges(g)
    :digraph.del_vertices(g, vs)
    {:reply, nil, fbp_graph}
  end

Notice, in the clear function, the :clear atom. It is used in the definition of a handle_call function as a way of distinguishing one GenServer call (or cast) from another. Note that we did not create a new state, a new ElixirFBP.Graph. This is because nothing actually changed in the state. The graph did change - the edges and vertices were deleted - but this element is implemented using Erlang's ETS - a mutable internal storage facility.

In the next post, I'll continue the implementation of the ElixirFBP Graph process. 

Tuesday, May 5, 2015

Writing an FBP program means, essentially, describing a graph or network of components, their interconnections, and certain initial conditions. Currently, there are three ways to describe an FBP program:
  1. Programmatically. For example, the following is an excerpt from Java program that uses the JavaFBP library of Paul Morrison:
    protected void define() {    
        connect(component("Read", ReadFile.class), port("OUT"), 
                component("Write", WriteFile.class), port("IN"));
        initialize("testdata/testdata.txt".replace("/", File.separator), 
                  component("Read"), port("SOURCE"));
        initialize("testdata/output".replace("/", File.separator), 
                  component("Write"), port("DESTINATION"));
      }
    Similar examples exist for other languages.
  2. Using the FBP Network Protocol. This transport-independent protocol is promoted by the NoFlo group. You can see it in action by watching the Websocket traffic between the NoFlo Development Environment and protoflo, a Python implementation of FBP.
  3. Using a DSL. There is a language, supported by NoFlo and Morrison, that can be used to create a textual description of an FBP graph. More about the language can be found here and  here. This is an example ('#' indicates a comment line):
    # Read the content of "somefile.txt" and split it by line
    'somefile.txt' -> SOURCE Read(ReadFile) OUT -> IN Split(SplitStr)
    # Count the lines and display the result
    Split() OUT -> IN Count(Counter) COUNT -> IN Display(Output)
    # The read errors are also displayed
    Read() ERROR -> IN Display()
    
No matter how it is described, the execution of an FBP program begins by instantiating components as processes. The presence of Initial Information Packets (IIP's), typically parameters or initial conditions and/or Information Packets (IP's) at a component's input ports will trigger its execution. The result of an execution is, typically, the sending of IP's to other, connected, component processes.

This process architecture suggests a way to begin the Elixir implementation: by developing the functions to create and execute a graph or network representation. Erlang has a library for just such a representation: digraph. While Elixir is syntactically different from Erlang, the Elixir programmer still has direct access to all of Erlang's many libraries. Jordan Damov, in one of his posts about Elixir, shows how one can use the digraph library. 

Graphs, then, will become the base for our design. The FBP ideas translate well: the nodes in the graphs will be Elixir processes and communication between the nodes will be performed asynchronously by sending messages to these processes. 

A final note: Morrison describes the differences between the original or "classic" FBP and the NoFlo implementation of FBP. I plan on implementing the classic version.

Sunday, May 3, 2015

This blog will be about flow-based programming (FBP) and, in particular, about an implementation of this programming paradigm using a language whose design closely matches FBP's goals. FBP has its origin in the 1970's and its designer, J. Paul Morrison, has championed it ever since. Recently, NoFlo with its browser-based diagramming tool has sparked renewed interest in FBP.

In the words of Morrison:

"Flow-Based Programming is an approach to developing applications, not in terms of the old von Neumann paradigm, but based on the concept of multiple asynchronous processes communicating by means of streams of data InformationPackets passing over BoundedBuffer connections."

Several languages - Java, C++, C#, Python and, most recently, JavaScript (Node.js) - have been used in support of an FBP style of programming. Given these implementations, it is somewhat ironic that there have been no implementations (AFAIK) of FBP in languages that match the inherent characteristics of FBP even though these languages exist. Notably, there is Erlang and its new incarnation Elixir, both of which have the linguistic facilities for creating processes that communicate via asynchronous message passing. Moreover, these processes are easily and cheaply created and the underlying system - the Erlang virtual machine - can support literally hundreds of thousands of concurrent processes. Importantly, these processes are not operating system threads.

In subsequent posts, I'll talk about the design and the implementation of FBP using the Elixir language. Erlang could have been used as well, but I wish to also explore the use of Elixir's macro facilities.