Thursday, August 18, 2016

GenStage Example No. 3 - Dispatching

In my last post, I showed one way for a stage to handle input from more than one source (stage). In this post, we'll look at the other side of the stage: output. Specifically, how can we send the output of one stage to more than one stage based on some type of criteria. As an example, we'll build a stage that can take in integer events and send the odd ones to one stage and the even ones to another stage.

First some background: GenStage has the notion of Dispatchers. These are processes that are responsible for handling both the upstream demand and the downstream events passing between stages. In the simple case, you do not have to specify the dispatcher: the default is the so-called DemandDispatcher. From the GenStage documentation:
GenStage.DemandDispatcher dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering.
GenStage also has a BroadcastDispatcher. From the documentation:
GenStage.BroadcastDispatcher - dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.
To use a dispatcher other than the default, you return a dispatcher option from a stage's init() function:
defmodule Stage1 do
    use GenStage
    def init(state) do
        {:producer_consumer, state, dispatcher: GenStage.BroadcastDispatcher}
When I first began looking at the problem of splitting output events according to some criteria, I looked at these dispatchers and didn't think that they would do what I needed. I also looked at a new dispatcher (in the latest release - 0.5.0) named PartitionDispatcher and decided that it wasn't applicable, as well. PartitionDispatcher distributes events to a specified number of partitions based on the result of hashing an event. By default, Erlang's phash2/2 is used but you can supply your own function. A likely scenario for this dispatcher would be where you wanted the processing of output events to be handled - evenly - by a group of worker stages.

Thinking that none of these dispatchers fit my needs,  I went ahead and designed and built a new type of dispatcher - I called it a DirectedDisptacher. It expects its input events to be labelled; the label is used to determine which stage to direct the event to. In our example, the even events would appear, for example, as the tuple {:evens, 28} and the odd events would be {:odds, 37}. The labeling of the events is performed by the splitting stage before it reaches the dispatcher. The DirectedDispatcher would know, via earlier subscriptions, the pid of the stage associated with each label and use this pid to send the event to the correct stage.

I'm not going to show my DirectedDispatcher - at least not in its present form - because, after finishing it, I had one of those moments of clarity: I could, in fact, use the PartitionDispatcher, at least for this example. Just because the function that the dispatcher is using is called a hash function doesn't mean that it has to use some hashing algorithm. The responsibility of the "hash" function is to accept an event and the number of partitions and return the event and an integer - the integer indicating which partition to send the event to. Things moved along quickly after this realization. Here's the complete code for the Splitter stage, along with its "hash" function:
defmodule Splitter do
  use GenStage

  def init(_) do
    {:producer_consumer, %{},
      dispatcher: {GenStage.PartitionDispatcher,
                    partitions: 2,
                    hash: &split/2}}

  def split(event, no_of_partitions ) do
    {event, rem(event, no_of_partitions)}
  def handle_events(events, _from, state) do
    {:noreply, events, state}
The split function simply computes a partition value based on the value of the event - an integer.

When a stage subscribes to another stage that uses a PartitionDispatcher, such as Splitter above, it needs to specify what partition the subscribing stage will represent. This is done by specifying an extra parameter to GenStage's subscribe functions. For example:
  {:ok, inport}    = GenStage.from_enumerable(1..10)
  {:ok, splitter}  = GenStage.start_link(Splitter, 0)
  {:ok, evens}     = GenStage.start_link(Ticker, {2_000, :evens})
  {:ok, odds}      = GenStage.start_link(Ticker, {2_000, :odds})

  GenStage.sync_subscribe(evens, to: splitter, partition: 0, max_demand: 1)
  GenStage.sync_subscribe(odds,  to: splitter, partition: 1, max_demand: 1)
  GenStage.sync_subscribe(splitter, to: inport, max_demand: 1)
As in earlier examples, the Ticker stage sends an upstream demand and then waits the specified number of milliseconds (2,000) before requesting another event. These tickers are the partition stages. When they subscribe to the Split stage, they indicate what partition number they are to be associated with. The extra initialization parameters - :evens and :odds - are used to distinguish one Ticker from the other.

I realize that am I probably misusing the purpose of the PartitionDispatcher, although the split function above is, very loosely speaking, a hash function.  I am going to revisit the implementation of my DirectedDispatcher. I think it can be modeled after PartitionDispatcher but it would use a "hash" function that accepted the event, and the dispatcher's state to determine which partition to send an event to. I hope to show this code in a future post.

In the meantime, the example outlined above is available on GitHub.

Monday, August 8, 2016

GenStage Example No. 2

In my early experiments with building flows out of Elixir processes, I built flows that had nodes that could take multiple inputs and produce a single output. A very simple example of such a process would be a Sum node that takes in two numbers and produces its sum.

How could this be done using GenStage? The documentation (version 0.4.3) states very clearly that
A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand.
To start, let's define a Constant stage that will produce as many copies of the same value as the demand requires. Below, is what a Constant stage could look like:
  defmodule Constant do
    @moduledoc """
    This stage will always produce demand number of constants.
    use GenStage
    def init(constant) do
      {:producer, constant}
    def handle_demand(demand, constant) when demand > 0 do
      events = List.duplicate(constant, demand)
      {:noreply, events, constant}
Note that it is initialized with a constant value which is stored as its state.  When it receives a demand - when a consumer asks for some events - the function handle_demand will make as many copies as have been asked for. There will be two copies of  Constant stage, each initialized with a different value (integer).

These two Constants will feed a Sum stage. Connecting a stage to multiple inputs is straightforward. For example:
  {:ok, input1} = GenStage.start_link(Stage1,:ok)
  {:ok, input2} = GenStage.start_link(Stage2, :ok)
  {:ok, process} = GenStage.start_link(Process, 0)
  GenStage.sync_subscribe(process, to: input1)
  GenStage.sync_subscribe(process, to: input2)
There is a potential problem with this code, though. In designing the Sum stage I have to account for two distinct sources of data: addends and augends.  Moreover, I want to make sure that I have two inputs available before they are summed. And, I don't want to take two values from one source and sum them. Given these constraints, we need some way of distinguishing the arriving events, that is, from what stage is an event coming?  [Arithmetic is commutative so that it really doesn't matter in what order the inports are used, but it does matter that the inputs are used in pairs - one from each inport.]

It turns out that GenStage creates a unique identifier, termed a tag, for every successful subscription. We need to access to that tag. This can be done by defining the handle_subscribe function for a stage. The GenStage macro supplies default implementations for this function when none are specified. There are two forms of handle_subscribe: one for consumer subscriptions and one for producer subscriptions. We are interested in the producer subscriptions. It's function signature is:
   def handle_subscribe(:producer, opts, {pid, tag}, state) do
In my example, this callback function will be called twice, once for each sync_subscribe to the Sum stage. The pid and tag values will be different on each call. We are going to distinguish one inport from another by supplying an "inport" option to the to the sync_subscribe function. So connecting the two Constant stages to the Sum stage will now look like:
  GenStage.sync_subscribe(sum, to: augend, inport: :augend)
  GenStage.sync_subscribe(sum, to: addend, inport: :addend)
Here's the code for the subscription callback, handle_subscribe, that we now have to provide to handle the extraction of the new option value and associating it with the subscription tag:
    def handle_subscribe(:producer, opts, {_pid, tag}, state) do
      inport = Keyword.get(opts, :inport)
      case inport do
        nil ->
          {:stop, "no inport specified", state}
        _ ->
          new_state = Map.put(state, inport, {tag, []})
          {:automatic, new_state}
Notice that I have taken the unique tag value that GenStage has generated and tied it to the name of the inport using a Map. This map becomes the state of the Sum stage. Also, notice that the other value associated with the tag is an empty list. This is where we will accumulate - queue - input events arriving at the Sum stage.
We are all set up now for the Sum stage to handle input events and to be able to tell which inport they are coming in on. The next bit of code is the implementation of the handle_events callback for the Sum stage:
    def handle_events(events, {_pid, tag}, %{:addend => {tag, _}} = state) do
      state = Map.update!(state, :addend,
          fn({tag, addends}) -> {tag, addends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    def handle_events(events, {_pid, tag}, %{:augend => {tag, _}} = state) do
      state = Map.update!(state, :augend,
            fn({tag, augends}) -> {tag, augends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
There is one handle_event function per inport and pattern matching on the saved tag will ensure that we are handling the right events. For each inport, the new events are appended to any prior events that have not been used. The function sum_inports is then called with the updated state. Here is sum_inports:
    defp sum_inports(state) do
      {augend_tag, augends} = Map.get(state, :augend, {nil, []})
      {addend_tag, addends} = Map.get(state, :addend, {nil, []})
      {addends, augends, results} = do_sum(addends, augends, [])
      state = Map.put(state, :addend, {addend_tag, addends})
      state = Map.put(state, :augend, {augend_tag, augends})
      {results, state}
This function simply extracts the queued up events associated with each inport and relies on the function do_sum to perform the arithmetic and return the, possibly updated, event queues. Based on its argument patterns, do_sum figures out what to add:
    defp do_sum([], augends, results) do
      {[], augends, results}
    defp do_sum(addends, [], results) do
      {addends, [], results}
    defp do_sum([h_addends|t_addends], [h_augends|t_augends], results) do
      do_sum(t_addends, t_augends, results ++ [h_addends + h_augends])
The complete code is available on GitHub. I've included instructions for running this flow a couple of different ways. One uses the Constant stages as described above. The other example uses the GenStage function from_enumerable which starts a producer stage from an enumerable or stream.


Wednesday, July 20, 2016

GenStage Example

Way back in December I wrote about a new Elixir project named GenRouter. It was a first attempt at defining a reactive streaming library for Elixir. It has come a long way since then! On July 14th, José Valim posted a blog entry announcing GenStage - the succesor to GenRouter.  I encourage you to read the post and watch the accompanying video as well. The "stage" in GenStage refers to the name given to the nodes in a flow; every stage is an Elixir/Erlang process thereby providing the means to parallelize flows. Stages can be of three types: consumers, producer/consumers and producers.  The data that move through a flow are called events - events can be of any type. There is much, much more to GenStage. Take a look at the excellent online documentation as well.

Just over a year ago, I wrote  about a graphical tool, named Streamtools, developed by the New York Times R&D group (it does not appear to be in active development as of this date; here's a link to description of the tool and the example.). I took one of their examples and implemented it with my ElixirFBP library. The example accesses the data feed of a bike share program in New York City. Below is an implementation of that example using GenStage (the code and instructions for running this example are available at GitHub):
alias Experimental.GenStage

defmodule GenstageExample do
  defmodule Map do
    use GenStage
    def init(url) do
      {:producer, url}
    def handle_demand(demand, url) when demand > 0 do
      events = List.duplicate(url, demand)
      {:noreply, events, url}

  defmodule GetHTTP do
    use GenStage
    def init(_) do
      {:producer_consumer, :ok}
    def handle_events(events, _from, _state) do
      events =, & retrieve_as_json &1)
      {:noreply, events, :ok}
    defp retrieve_as_json(url) do
      {:ok, %HTTPoison.Response{body: body}} = HTTPoison.get(url)

  defmodule Unpack do
    use GenStage
    def init(element) do
      {:producer_consumer, element}
    def handle_events(events, _from, element) do
      events =, & &1[element])
      {:noreply, events, element}

  defmodule Filter do
    use GenStage
    def init({filter, filter_value}) do
      {:producer_consumer, {filter, filter_value}}
    def handle_events(events, _from, {filter, filter_value} = state) do
      events =,
                           fn stations ->
                             Enum.filter(stations, & &1[filter] == filter_value)
      {:noreply, events, state}
  defmodule Ticker do
    use GenStage
    def init(sleeping_time) do
      {:consumer, sleeping_time}
    def handle_events(events, _from, sleeping_time) do
      {:noreply, [], sleeping_time}

  # Start up each stage with initial values.
  {:ok, map}     = GenStage.start_link(Map, 
  {:ok, getHTTP} = GenStage.start_link(GetHTTP, :ok)
  {:ok, unpack}  = GenStage.start_link(Unpack, "stationBeanList")
  {:ok, filter}  = GenStage.start_link(Filter, {"stationName", "W 14 St & The High Line"})
  {:ok, ticker}  = GenStage.start_link(Ticker, 5_000)

  # Join (subscribe) stages and create the data (event) flow
  GenStage.sync_subscribe(ticker, to: filter)
  GenStage.sync_subscribe(filter, to: unpack)
  GenStage.sync_subscribe(unpack, to: getHTTP)
  GenStage.sync_subscribe(getHTTP, to: map, max_demand: 1)


There are five stages/nodes in this data flow:
  1. Map - supply a url but only if there is demand for it.
  2. GetHTTP - retrieve web data, converting the json to an Elixir KeyMap
  3. Unpack - extract an element from the keymap
  4. Filter - find a particular bike station by its name
  5. Ticker - Send a single event (demand) every 5 seconds
My original implementation of this data flow operated in push mode. That is, the first node of the flow sent a signal to the second node in order to get things started. GenStage flows operate in a pull or demand driven mode so this Ticker node is placed at the end of the flow. It notifies the stage that it is connected to that it wants - at most - one event. This demand is passed to all of the other stages until it arrives at the only pure producer in the flow: Map. All other stages are producer/consumers. They respond to - consume - events by producing new ones.

Wednesday, December 23, 2015


In my last post, I briefly mentioned a new Elixir module named GenRouter. José Valim described its rationale in his Elixir Conf 2015 (US) keynote address. The project is in development and can be seen on GitHub. As with all Elixir core code, it is superbly documented - even before it has been fully implemented - although they do not seem to have settled on a naming convention. For example, the terms source and sink are used for what FBP calls in and out ports.

The name GenRouter was chosen to reflect the idea that it is a generic module  similar to the the other Elixir generic libraries: GenServer and GenEvent. Here are the opening paragraphs from the current documentation:
A behaviour module for routing events from multiple sources to multiple sinks. 
GenRouter allows developers to receive events from multiple sources and/or send events to multiple sinks. The relationship between sources (the one sending messages) and sinks (the one receiving them) is established when the sink explicitly asks for data. 
Since the same process can send and receive data, the same process can be both a source and sink, which is quite common. Sources that only send data and sinks that only receive data are called “definite source” and “definite sink”, respectively.
In his talk José acknowledged talking with the Akka team. Akka is a member of the Reactive Stream Initiative, which among other things has released a specification on how well-behaved processes handling streams of data are to act, especially with regard to "back pressure". I've mentioned Reactive Streams in early posts; its specification has also influenced my design. Basically, there is two-way communication between entities that are sending and/or receiving data steams. Demand requests are sent upstream and the fulfillment of these demand requests are sent downstream. Hence, nothing moves unless there is a demand for something.

The Reactive Streams influence on GenRouter is manifested in a module named GenRouter.Spec. In the module there are four "convenience" functions that one can use to:
  1. subscribe() to an upstream source, asking for some number of data events to be sent. So the Subscriber is asking a Publisher for a Subscription and for a certain maximum number of events (data). The Subscriber must honor the parameters of this Subscription
  2. ask() for data from an upstream source, specifying how many data events an entity can receive.
  3. route() is used to send data to a downstream entity. The Subscriber sends data to a subscribed Publisher.
  4. unsubscribe() will cancel the source/sink relationship. The Subscriber will attempt to cancel the Subscription that it has with a Publisher.
As of this date GenRouter needs to be "fleshed out".

Monday, October 19, 2015

Subscription Redux

ElixirFBP Subscription

An ElixirFBP Subscription is the "glue" that ties together all of the components in a flow graph. While Components could certainly send messages directly to each other, I found a compelling reason for developing a Subscription as a result of reading about Reactive Streams. After some fits and starts, I have developed a Subscription which seems to be able to run in either a "pull" or "push" mode. This code has a slight smell - I might be trying to do too much with this one module. It may make more sense to have two different Subscriptions - for push and pull mode, or, alternatively, develop a macro that generates a different Subscription depending on the run mode value.

In pull mode a flow typically begins with the last component(s) requesting some number of information packets. This request propagates upstream through the flow graph until eventually reaching component(s) that can provide data. Running in pull mode also allows a component (actually the subscription) to apply "back pressure" to the upstream components. Finally, a component won't respond to a request until it has all the values it needs available at its in ports.

In push mode a flow begins with the arrival of information packets at - usually - the upstream components.  When a component has received all of the data that it requires, it computes a result and sends it to its out port.

If a component is meant to work in both a push and pull mode then its listening loop must account for different sets of messages. For example, here's an Add component:

defmodule Math.Add do
  @moduledoc """
  This module describes an FBP Component: Math.Add
  It is can operate in both a push and pull mode
  @behaviour ElixirFBP.Behaviour

  def description, do: "Add two integers"
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]

  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    receive do
      {:addend, value} when is_number(augend)->
        send(outports[:sum], {:sum, value + augend})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
      {:augend, value} when is_number(addend) ->
        send(outports[:sum], {:sum, addend + value})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      :sum when is_number(addend) and is_number(augend) ->
        send(outports[:sum], {:sum, addend + augend})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)

The last receive clause is used when the flow is operating in pull mode. This message (:sum) originates in the subscription that connects this component's out port (:sum) with the in port to another component. Note that the destination of the send to the :sum out port is a Subscription
process; these processes are all started when a ElixirFBP.Network.start function is executed.

As I mentioned earlier in this blog, I would like to eventually develop an Elixir macro that would generate all these receive clauses based on a more declarative description of the component's in and out ports and computation.

A Refactoring

I refactored my ElixirFBP project - removing all of the Runtime support for the Noflo-ui network protocol. I will develop another project that uses ElixirFBP as a service and re-implement support for this network protocol. This essentially means using Cowboy to manage websocket requests and turn them into calls to the ElixirFBP gen servers. The newly refactored project code is available at my GitHub repository.


I watched José Valim's keynote talk at the recently completed Elixir Conf 2015 (US). I learned that we might expect a new gen behaviour named GenRouter to appear in Elixir 1.3. Early code is available on Github. At first glance, it would appear that GenRouter might server very nicely as a base for FBP. I will follow the design and development closely and, at some point, see whether I can move ElixirFBP on top of it.

Friday, August 21, 2015

"Do a Something!"

I have been spending a great deal of time worrying about possible overruns in an ElixirFBP flow. Overruns are possible when the producer of Information Packets (IPs) sends them to a component that can not process them quickly enough and, as a result, all kinds of terrible things can happen. Even though Erlang/Elixir is designed from the ground up to be a message passing system, there is the very real possibility of a process's mailbox of messages becoming full.

This possibility lead me to read (and blog) about Reactive Streams and I have attempted to implement the design outlined in the Reactive Stream specification. What has bothered me about this approach is that it adds an extra processing step between any two flow components. This is the Subscription that I've described earlier. Adding a step is. of course, going to slow things down - it would be much nicer not to have this extra processing time added on. But, it doesn't appear that this is possible; something has to be monitoring the size of component's mailbox.

As part of my research into solutions to this problem, I spent the last couple of days watching a series of presentations given at the 2013 Erlang conference. The presentations were part of a track entitled: Load Regulation and Back Pressure. Ulf Wiger's 'Jobs' Load Regulation Framework was very interesting. Also, Bryan Fink's Riak Pipe.

What now? Well, I know that there are potential problems, especially if ElixirFBP ever is used in an environment such as real-time monitoring. And, I also know that there are solutions out there. But, I haven't been making much progress in the last few weeks. I think I might be suffering from Analysis Paralysis. As an old boss of mine once told me: "Do a Something!". So I will.

Friday, July 17, 2015

The correct Subscription?

In the last post, I showed a possible implementation for a Subscription - the entity that sits between a Publisher and a Subscriber in a Reactive Stream. It wasn't a very good implementation; it is lacking in many respects not the least of which is that it couldn't deal with an FBP Component executing in multiple Elixir/Erlang processes. ElixirFBP is meant to exploit the relatively cheap cost of spawning many processes in Erlang/Elixir. I could have used one Subscription per component process but that would have placed the burden of determining which process to send a message to with the Component. I would like to keep the Components as simple as possible. Of course, there is still the possibility that a Subscription process will become a "choke" point in a flow.

Below is the heart of a new version of the Subscription module: the loop function:
    def loop(%Subscription{
                publisher_pids: publisher_pids,
                publisher_port: publisher_port,
                subscriber_pids: subscriber_pids,
                subscriber_pids_length: subscriber_pids_length,
                subscriber_port: subscriber_port} = subscription,
                subscriber_index) do
      receive do
        {:pull, n}  when n > 0 ->
          # A request for data from the subscriber
          # Ask all the publishers for a data value
            |> Stream.take(n)
            |> Enum.each(fn(publisher_pid) ->
              send(publisher_pid, {publisher_port, self()})
          loop(subscription, 0)
        {^publisher_port, value} ->
          # Publisher has sent data - pass it on to the next subscriber
          send(elem(subscriber_pids, subscriber_index), {subscriber_port, value})
          loop(subscription, rem(subscriber_index + 1, subscriber_pids_length))
        message ->
"Received unknown message: #{inspect message}")
          loop(subscription, subscriber_index)
Notice that when a pull request is received, all of the processes that are currently executing the Publisher component are sent a request. When the responses to these requests begin arriving back, each response is sent to one of the Subscriber component processes in a round-robin fashion. Unless otherwise specified, the default number of processes spawned per component is one.

There is still a problem with this design: the responses are not necessarily received in the same order as the requests that were sent out. There are many examples where the order of the Information Packets is important. I will address this shortcoming a little later.

How does this all work? At the lowest level, these are the steps required:
  1. Spawn one or more processes for every Component in the FBP Graph.
  2. Spawn a Subscription process for every edge between two Components.
  3. Send pull requests to the Subscription processes.
These steps are illustrated in the code below:
    iip1 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 44])
    iip2 = Component.start_with_iips(Trial.InitialInformationPacket, [value: 33])
    add = Component.start(Trial.Add)
    show = Component.start(Trial.Show)
    sub1_pid = Subscription.start(iip1, :value, add, :addend)
    sub2_pid = Subscription.start(iip2, :value, add, :augend)
    sub3_pid = Subscription.start(add, :sum, show, :in)
    send(sub3_pid, {:pull, 1})
    send(sub2_pid, {:pull, 1})
    send(sub1_pid, {:pull, 1})
An InitialInformationPacket is a component that always responds with a constant value. In the example, the Add component is sent the constant values of 44 and 33.

I now need to introduce Subscriptions to the existing code base and make it all work - as it once did - with the NoFlo Development Environment.