Tuesday, November 15, 2016

GenStage and Bioinformatics

In this post, I will describe my first attempt at using GenStage - in partiuclar, its Flow module - to implement a bioinformatics algorithm. My day job for the last couple of years has been the building of a full-stack system for the storage and searching of the metagenomic analyses (metagenomics) of hydrocarbon resources. That's a mouthful. Basically it means that there is a need to know what types of microorganisms live in oil deposits, for example. Primarily bacterial species, some of these organisms can corrode steel pipes and do other nasty things. Metagenomics begins with the genetic sequencing of thousands of species that are found in these hydrocarbon resources. Because the sequencing results in only pieces of an organism's genome being read - not the whole genome, it becomes a big computational challenge to figure out which species a piece belongs to.

One of the reasons I started this blog was because metagenomics is typically performed by sending what is called the raw read of nucleotide bases (the A, T, U, C, and G's) through a pipeline of processes. A popular pipeline is Qiime (pronounced chime). It consists of many Python programs that are strung together. Initially, I created my own Elixir flow-based framework but, now that GenStage is here, I thought it might be interesting to see how its facilities could be used to perform these types of analyses.

Instead of trying to rewrite Qiime in Elixir - no small task - I determined that I needed to start small and see how I could implement basic bioinformatic algorithms in Elixir and reap the benefits (faster?) of the parallel processing that comes with the use of GenStage. Towards that end, I began taking a great course at Coursera entitled "Finding Hidden Messages in DNA (Bioinformatics I)". Problems in bioinformatic are presented and solutions are outlined but it is up to you to implement the solutions in the language of your choice. One problem is the so-called "Clump Finding Problem". First a definition: a k-mer is a sequence k nucleotides long - for example, CTGGAT is a 6-mer. Given a sequence - this can be millions of nucleotides long - find those k-mers that appear at least some number of times in this sequence. But instead of using the entire sequence, you look for these k-mers in a smaller subsequence. Given an input sequence that is 1500 bases long, we might only be interested in finding k-mers in a 500 base-long subsequence.  Here's an example from the course:

Input: CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA

Parameters: Find the 5-mers that appear at least 4 times in a 50-base long subsequence.

Output: CGACA GAAGA

The algorithm needs to slide along the sequence extracting the subsequence then slide along the subsequence extracting the k-mers keeping count of the unique k-mers. Sliding is performed one base at a time in both cases. Finally, the result will be those k-mers that are found at least a specified number of times.

Why is this algorithm needed? It turns out that there are certain sequences of nucleotides that appear in an organism's genome that are significant. For example, in Bacteria, these sequences might indicate where the replication of their DNA should begin and end.

Here's my first attempt at implementing this algorithm using the Flow module of GenStage:
defmodule Bio do
alias Experimental.Flow

def clump(seq, k_mer_len, subseq_len, times) do
  |> seq
  |> String.to_charlist
  |> Stream.chunk(subseq_len, 1)
  |> Flow.from_enumerable
  |> Flow.partition
  |> Flow.map(fn e -> Stream.chunk(e, k_mer_len, 1) end)
  |> Flow.map(
        fn e ->
          Enum.reduce(e, %{},
            fn w, acc ->
              Map.update(acc, w, 1, & &1 + 1)
            end)
        end)
  |> Flow.flat_map(
        fn e ->
          Enum.reject(e, fn({_, n}) -> n < times end)
        end)
  |> Flow.map(fn({seq, _}) -> seq end)
  |> Enum.uniq
end
end
Although not optimal, the code does work. Looking a little closer, it:
  1. Creates a Flow from the subsequences,
  2. Continues the flow by creating the k-mers, counting their occurrences and storing this count in a Map, and
  3. Removes (rejects) all entries (k-mers) that didn't occur enough times.
  4. Finally, triggers the flow by calling Enum.uniq, a function that eliminates duplicates (we only care that the k-mer appeared at least some number of times, not how many times.) 
Notice my use of Stream.chunk/4. This function is implemented for Enum's and Stream's but not for Flows. Wondering if there needed to be an implementation of the chunk function in Flow, I posted a question on Elixir's mail list. José Valim, Elixir's creator, graciously answered my question and even posted a better implementation of the clump function! (I'll show his solution below). Importantly, he pointed out that you have to be careful in the use of Flow especially when you need to preserve the order of the original sequence. This is because you don't know when any one of the parallel processes will finish and deliver its result. As it turns out, this particular clump finding algorithm doesn't need to preserver the original order of the sequence. Finally, José pointed out that I didn't need to use Flow.partition because the algorithm wasn't reducing over state.

Here's José's implementation:
def clump(seq, k_mer_len, subseq_len, times) do
  seq
  |> String.to_charlist
  |> Stream.chunk(subseq_len, 1)
  |> Flow.from_enumerable
  |> Flow.flat_map(&find_sequences(&1, k_mer_len, times))
  |> Enum.uniq
end
def find_sequences(subseq, k_mer_len, times) do
  subseq
  |> Stream.chunk(k_mer_len, 1)
  |> Enum.reduce(%{}, fn w, acc ->
       Map.update(acc, w, 1, & &1 + 1)
     end)
  |> Enum.reject(fn({_, n}) -> n < times end)
  |> Enum.map(fn({seq, _}) -> seq end)
end

Thursday, August 18, 2016

GenStage Example No. 3 - Dispatching

In my last post, I showed one way for a stage to handle input from more than one source (stage). In this post, we'll look at the other side of the stage: output. Specifically, how can we send the output of one stage to more than one stage based on some type of criteria. As an example, we'll build a stage that can take in integer events and send the odd ones to one stage and the even ones to another stage.

First some background: GenStage has the notion of Dispatchers. These are processes that are responsible for handling both the upstream demand and the downstream events passing between stages. In the simple case, you do not have to specify the dispatcher: the default is the so-called DemandDispatcher. From the GenStage documentation:
GenStage.DemandDispatcher dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering.
GenStage also has a BroadcastDispatcher. From the documentation:
GenStage.BroadcastDispatcher - dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.
To use a dispatcher other than the default, you return a dispatcher option from a stage's init() function:
defmodule Stage1 do
    use GenStage
    def init(state) do
        {:producer_consumer, state, dispatcher: GenStage.BroadcastDispatcher}
    end
    ...
When I first began looking at the problem of splitting output events according to some criteria, I looked at these dispatchers and didn't think that they would do what I needed. I also looked at a new dispatcher (in the latest release - 0.5.0) named PartitionDispatcher and decided that it wasn't applicable, as well. PartitionDispatcher distributes events to a specified number of partitions based on the result of hashing an event. By default, Erlang's phash2/2 is used but you can supply your own function. A likely scenario for this dispatcher would be where you wanted the processing of output events to be handled - evenly - by a group of worker stages.

Thinking that none of these dispatchers fit my needs,  I went ahead and designed and built a new type of dispatcher - I called it a DirectedDisptacher. It expects its input events to be labelled; the label is used to determine which stage to direct the event to. In our example, the even events would appear, for example, as the tuple {:evens, 28} and the odd events would be {:odds, 37}. The labeling of the events is performed by the splitting stage before it reaches the dispatcher. The DirectedDispatcher would know, via earlier subscriptions, the pid of the stage associated with each label and use this pid to send the event to the correct stage.

I'm not going to show my DirectedDispatcher - at least not in its present form - because, after finishing it, I had one of those moments of clarity: I could, in fact, use the PartitionDispatcher, at least for this example. Just because the function that the dispatcher is using is called a hash function doesn't mean that it has to use some hashing algorithm. The responsibility of the "hash" function is to accept an event and the number of partitions and return the event and an integer - the integer indicating which partition to send the event to. Things moved along quickly after this realization. Here's the complete code for the Splitter stage, along with its "hash" function:
defmodule Splitter do
  use GenStage

  def init(_) do
    {:producer_consumer, %{},
      dispatcher: {GenStage.PartitionDispatcher,
                    partitions: 2,
                    hash: &split/2}}
  end

  def split(event, no_of_partitions ) do
    {event, rem(event, no_of_partitions)}
  end
  def handle_events(events, _from, state) do
    {:noreply, events, state}
  end
end
The split function simply computes a partition value based on the value of the event - an integer.

When a stage subscribes to another stage that uses a PartitionDispatcher, such as Splitter above, it needs to specify what partition the subscribing stage will represent. This is done by specifying an extra parameter to GenStage's subscribe functions. For example:
  {:ok, inport}    = GenStage.from_enumerable(1..10)
  {:ok, splitter}  = GenStage.start_link(Splitter, 0)
  {:ok, evens}     = GenStage.start_link(Ticker, {2_000, :evens})
  {:ok, odds}      = GenStage.start_link(Ticker, {2_000, :odds})

  GenStage.sync_subscribe(evens, to: splitter, partition: 0, max_demand: 1)
  GenStage.sync_subscribe(odds,  to: splitter, partition: 1, max_demand: 1)
  GenStage.sync_subscribe(splitter, to: inport, max_demand: 1)
As in earlier examples, the Ticker stage sends an upstream demand and then waits the specified number of milliseconds (2,000) before requesting another event. These tickers are the partition stages. When they subscribe to the Split stage, they indicate what partition number they are to be associated with. The extra initialization parameters - :evens and :odds - are used to distinguish one Ticker from the other.

I realize that am I probably misusing the purpose of the PartitionDispatcher, although the split function above is, very loosely speaking, a hash function.  I am going to revisit the implementation of my DirectedDispatcher. I think it can be modeled after PartitionDispatcher but it would use a "hash" function that accepted the event, and the dispatcher's state to determine which partition to send an event to. I hope to show this code in a future post.

In the meantime, the example outlined above is available on GitHub.


Monday, August 8, 2016

GenStage Example No. 2

In my early experiments with building flows out of Elixir processes, I built flows that had nodes that could take multiple inputs and produce a single output. A very simple example of such a process would be a Sum node that takes in two numbers and produces its sum.

How could this be done using GenStage? The documentation (version 0.4.3) states very clearly that
A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand.
To start, let's define a Constant stage that will produce as many copies of the same value as the demand requires. Below, is what a Constant stage could look like:
  defmodule Constant do
    @moduledoc """
    This stage will always produce demand number of constants.
    """
    use GenStage
    def init(constant) do
      {:producer, constant}
    end
    def handle_demand(demand, constant) when demand > 0 do
      events = List.duplicate(constant, demand)
      {:noreply, events, constant}
    end
  end
Note that it is initialized with a constant value which is stored as its state.  When it receives a demand - when a consumer asks for some events - the function handle_demand will make as many copies as have been asked for. There will be two copies of  Constant stage, each initialized with a different value (integer).

These two Constants will feed a Sum stage. Connecting a stage to multiple inputs is straightforward. For example:
  {:ok, input1} = GenStage.start_link(Stage1,:ok)
  {:ok, input2} = GenStage.start_link(Stage2, :ok)
  {:ok, process} = GenStage.start_link(Process, 0)
  GenStage.sync_subscribe(process, to: input1)
  GenStage.sync_subscribe(process, to: input2)
There is a potential problem with this code, though. In designing the Sum stage I have to account for two distinct sources of data: addends and augends.  Moreover, I want to make sure that I have two inputs available before they are summed. And, I don't want to take two values from one source and sum them. Given these constraints, we need some way of distinguishing the arriving events, that is, from what stage is an event coming?  [Arithmetic is commutative so that it really doesn't matter in what order the inports are used, but it does matter that the inputs are used in pairs - one from each inport.]

It turns out that GenStage creates a unique identifier, termed a tag, for every successful subscription. We need to access to that tag. This can be done by defining the handle_subscribe function for a stage. The GenStage macro supplies default implementations for this function when none are specified. There are two forms of handle_subscribe: one for consumer subscriptions and one for producer subscriptions. We are interested in the producer subscriptions. It's function signature is:
   def handle_subscribe(:producer, opts, {pid, tag}, state) do
In my example, this callback function will be called twice, once for each sync_subscribe to the Sum stage. The pid and tag values will be different on each call. We are going to distinguish one inport from another by supplying an "inport" option to the to the sync_subscribe function. So connecting the two Constant stages to the Sum stage will now look like:
  GenStage.sync_subscribe(sum, to: augend, inport: :augend)
  GenStage.sync_subscribe(sum, to: addend, inport: :addend)
Here's the code for the subscription callback, handle_subscribe, that we now have to provide to handle the extraction of the new option value and associating it with the subscription tag:
    def handle_subscribe(:producer, opts, {_pid, tag}, state) do
      inport = Keyword.get(opts, :inport)
      case inport do
        nil ->
          {:stop, "no inport specified", state}
        _ ->
          new_state = Map.put(state, inport, {tag, []})
          {:automatic, new_state}
      end
    end
Notice that I have taken the unique tag value that GenStage has generated and tied it to the name of the inport using a Map. This map becomes the state of the Sum stage. Also, notice that the other value associated with the tag is an empty list. This is where we will accumulate - queue - input events arriving at the Sum stage.
We are all set up now for the Sum stage to handle input events and to be able to tell which inport they are coming in on. The next bit of code is the implementation of the handle_events callback for the Sum stage:
    def handle_events(events, {_pid, tag}, %{:addend => {tag, _}} = state) do
      state = Map.update!(state, :addend,
          fn({tag, addends}) -> {tag, addends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    end
    def handle_events(events, {_pid, tag}, %{:augend => {tag, _}} = state) do
      state = Map.update!(state, :augend,
            fn({tag, augends}) -> {tag, augends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    end
There is one handle_event function per inport and pattern matching on the saved tag will ensure that we are handling the right events. For each inport, the new events are appended to any prior events that have not been used. The function sum_inports is then called with the updated state. Here is sum_inports:
    defp sum_inports(state) do
      {augend_tag, augends} = Map.get(state, :augend, {nil, []})
      {addend_tag, addends} = Map.get(state, :addend, {nil, []})
      {addends, augends, results} = do_sum(addends, augends, [])
      state = Map.put(state, :addend, {addend_tag, addends})
      state = Map.put(state, :augend, {augend_tag, augends})
      {results, state}
    end
This function simply extracts the queued up events associated with each inport and relies on the function do_sum to perform the arithmetic and return the, possibly updated, event queues. Based on its argument patterns, do_sum figures out what to add:
    defp do_sum([], augends, results) do
      {[], augends, results}
    end
    defp do_sum(addends, [], results) do
      {addends, [], results}
    end
    defp do_sum([h_addends|t_addends], [h_augends|t_augends], results) do
      do_sum(t_addends, t_augends, results ++ [h_addends + h_augends])
    end
The complete code is available on GitHub. I've included instructions for running this flow a couple of different ways. One uses the Constant stages as described above. The other example uses the GenStage function from_enumerable which starts a producer stage from an enumerable or stream.

Enjoy!

Wednesday, July 20, 2016

GenStage Example

Way back in December I wrote about a new Elixir project named GenRouter. It was a first attempt at defining a reactive streaming library for Elixir. It has come a long way since then! On July 14th, José Valim posted a blog entry announcing GenStage - the succesor to GenRouter.  I encourage you to read the post and watch the accompanying video as well. The "stage" in GenStage refers to the name given to the nodes in a flow; every stage is an Elixir/Erlang process thereby providing the means to parallelize flows. Stages can be of three types: consumers, producer/consumers and producers.  The data that move through a flow are called events - events can be of any type. There is much, much more to GenStage. Take a look at the excellent online documentation as well.

Just over a year ago, I wrote  about a graphical tool, named Streamtools, developed by the New York Times R&D group (it does not appear to be in active development as of this date; here's a link to description of the tool and the example.). I took one of their examples and implemented it with my ElixirFBP library. The example accesses the data feed of a bike share program in New York City. Below is an implementation of that example using GenStage (the code and instructions for running this example are available at GitHub):
alias Experimental.GenStage

defmodule GenstageExample do
  defmodule Map do
    use GenStage
    def init(url) do
      {:producer, url}
    end
    def handle_demand(demand, url) when demand > 0 do
      events = List.duplicate(url, demand)
      {:noreply, events, url}
    end
  end

  defmodule GetHTTP do
    use GenStage
    def init(_) do
      {:producer_consumer, :ok}
    end
    def handle_events(events, _from, _state) do
      events = Enum.map(events, & retrieve_as_json &1)
      {:noreply, events, :ok}
    end
    defp retrieve_as_json(url) do
      {:ok, %HTTPoison.Response{body: body}} = HTTPoison.get(url)
      Poison.Parser.parse!(body)
    end
  end

  defmodule Unpack do
    use GenStage
    def init(element) do
      {:producer_consumer, element}
    end
    def handle_events(events, _from, element) do
      events = Enum.map(events, & &1[element])
      {:noreply, events, element}
    end
  end

  defmodule Filter do
    use GenStage
    def init({filter, filter_value}) do
      {:producer_consumer, {filter, filter_value}}
    end
    def handle_events(events, _from, {filter, filter_value} = state) do
      events = Enum.map(events,
                           fn stations ->
                             Enum.filter(stations, & &1[filter] == filter_value)
                           end)
      {:noreply, events, state}
    end
  end
  defmodule Ticker do
    use GenStage
    def init(sleeping_time) do
      {:consumer, sleeping_time}
    end
    def handle_events(events, _from, sleeping_time) do
      IO.inspect(events)
      Process.sleep(sleeping_time)
      {:noreply, [], sleeping_time}
    end
  end

  # Start up each stage with initial values.
  {:ok, map}     = GenStage.start_link(Map, 
                                   "http://feeds.citibikenyc.com/stations/stations.json")
  {:ok, getHTTP} = GenStage.start_link(GetHTTP, :ok)
  {:ok, unpack}  = GenStage.start_link(Unpack, "stationBeanList")
  {:ok, filter}  = GenStage.start_link(Filter, {"stationName", "W 14 St & The High Line"})
  {:ok, ticker}  = GenStage.start_link(Ticker, 5_000)

  # Join (subscribe) stages and create the data (event) flow
  GenStage.sync_subscribe(ticker, to: filter)
  GenStage.sync_subscribe(filter, to: unpack)
  GenStage.sync_subscribe(unpack, to: getHTTP)
  GenStage.sync_subscribe(getHTTP, to: map, max_demand: 1)

  Process.sleep(:infinity)
end

There are five stages/nodes in this data flow:
  1. Map - supply a url but only if there is demand for it.
  2. GetHTTP - retrieve web data, converting the json to an Elixir KeyMap
  3. Unpack - extract an element from the keymap
  4. Filter - find a particular bike station by its name
  5. Ticker - Send a single event (demand) every 5 seconds
My original implementation of this data flow operated in push mode. That is, the first node of the flow sent a signal to the second node in order to get things started. GenStage flows operate in a pull or demand driven mode so this Ticker node is placed at the end of the flow. It notifies the stage that it is connected to that it wants - at most - one event. This demand is passed to all of the other stages until it arrives at the only pure producer in the flow: Map. All other stages are producer/consumers. They respond to - consume - events by producing new ones.

Wednesday, December 23, 2015

GenRouter

In my last post, I briefly mentioned a new Elixir module named GenRouter. José Valim described its rationale in his Elixir Conf 2015 (US) keynote address. The project is in development and can be seen on GitHub. As with all Elixir core code, it is superbly documented - even before it has been fully implemented - although they do not seem to have settled on a naming convention. For example, the terms source and sink are used for what FBP calls in and out ports.

The name GenRouter was chosen to reflect the idea that it is a generic module  similar to the the other Elixir generic libraries: GenServer and GenEvent. Here are the opening paragraphs from the current documentation:
A behaviour module for routing events from multiple sources to multiple sinks. 
GenRouter allows developers to receive events from multiple sources and/or send events to multiple sinks. The relationship between sources (the one sending messages) and sinks (the one receiving them) is established when the sink explicitly asks for data. 
Since the same process can send and receive data, the same process can be both a source and sink, which is quite common. Sources that only send data and sinks that only receive data are called “definite source” and “definite sink”, respectively.
In his talk José acknowledged talking with the Akka team. Akka is a member of the Reactive Stream Initiative, which among other things has released a specification on how well-behaved processes handling streams of data are to act, especially with regard to "back pressure". I've mentioned Reactive Streams in early posts; its specification has also influenced my design. Basically, there is two-way communication between entities that are sending and/or receiving data steams. Demand requests are sent upstream and the fulfillment of these demand requests are sent downstream. Hence, nothing moves unless there is a demand for something.

The Reactive Streams influence on GenRouter is manifested in a module named GenRouter.Spec. In the module there are four "convenience" functions that one can use to:
  1. subscribe() to an upstream source, asking for some number of data events to be sent. So the Subscriber is asking a Publisher for a Subscription and for a certain maximum number of events (data). The Subscriber must honor the parameters of this Subscription
  2. ask() for data from an upstream source, specifying how many data events an entity can receive.
  3. route() is used to send data to a downstream entity. The Subscriber sends data to a subscribed Publisher.
  4. unsubscribe() will cancel the source/sink relationship. The Subscriber will attempt to cancel the Subscription that it has with a Publisher.
As of this date GenRouter needs to be "fleshed out".

Monday, October 19, 2015

Subscription Redux

ElixirFBP Subscription

An ElixirFBP Subscription is the "glue" that ties together all of the components in a flow graph. While Components could certainly send messages directly to each other, I found a compelling reason for developing a Subscription as a result of reading about Reactive Streams. After some fits and starts, I have developed a Subscription which seems to be able to run in either a "pull" or "push" mode. This code has a slight smell - I might be trying to do too much with this one module. It may make more sense to have two different Subscriptions - for push and pull mode, or, alternatively, develop a macro that generates a different Subscription depending on the run mode value.

In pull mode a flow typically begins with the last component(s) requesting some number of information packets. This request propagates upstream through the flow graph until eventually reaching component(s) that can provide data. Running in pull mode also allows a component (actually the subscription) to apply "back pressure" to the upstream components. Finally, a component won't respond to a request until it has all the values it needs available at its in ports.

In push mode a flow begins with the arrival of information packets at - usually - the upstream components.  When a component has received all of the data that it requires, it computes a result and sends it to its out port.

If a component is meant to work in both a push and pull mode then its listening loop must account for different sets of messages. For example, here's an Add component:

defmodule Math.Add do
  @moduledoc """
  This module describes an FBP Component: Math.Add
  It is can operate in both a push and pull mode
  """
  @behaviour ElixirFBP.Behaviour

  def description, do: "Add two integers"
  def inports, do: [addend: :integer, augend: :integer]
  def outports, do: [sum: :integer]

  def loop(inports, outports) do
    %{:augend => augend, :addend => addend} = inports
    receive do
      {:addend, value} when is_number(augend)->
        send(outports[:sum], {:sum, value + augend})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
      {:addend, value} ->
        inports = %{inports | :addend => value}
        loop(inports, outports)
      {:augend, value} when is_number(addend) ->
        send(outports[:sum], {:sum, addend + value})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
      {:augend, value} ->
        inports = %{inports | :augend => value}
        loop(inports, outports)
      :sum when is_number(addend) and is_number(augend) ->
        send(outports[:sum], {:sum, addend + augend})
        inports = %{inports | :addend => nil, :augend => nil}
        loop(inports, outports)
    end
  end
end

The last receive clause is used when the flow is operating in pull mode. This message (:sum) originates in the subscription that connects this component's out port (:sum) with the in port to another component. Note that the destination of the send to the :sum out port is a Subscription
process; these processes are all started when a ElixirFBP.Network.start function is executed.

As I mentioned earlier in this blog, I would like to eventually develop an Elixir macro that would generate all these receive clauses based on a more declarative description of the component's in and out ports and computation.

A Refactoring

I refactored my ElixirFBP project - removing all of the Runtime support for the Noflo-ui network protocol. I will develop another project that uses ElixirFBP as a service and re-implement support for this network protocol. This essentially means using Cowboy to manage websocket requests and turn them into calls to the ElixirFBP gen servers. The newly refactored project code is available at my GitHub repository.

GenRouter!

I watched José Valim's keynote talk at the recently completed Elixir Conf 2015 (US). I learned that we might expect a new gen behaviour named GenRouter to appear in Elixir 1.3. Early code is available on Github. At first glance, it would appear that GenRouter might server very nicely as a base for FBP. I will follow the design and development closely and, at some point, see whether I can move ElixirFBP on top of it.

Friday, August 21, 2015

"Do a Something!"

I have been spending a great deal of time worrying about possible overruns in an ElixirFBP flow. Overruns are possible when the producer of Information Packets (IPs) sends them to a component that can not process them quickly enough and, as a result, all kinds of terrible things can happen. Even though Erlang/Elixir is designed from the ground up to be a message passing system, there is the very real possibility of a process's mailbox of messages becoming full.

This possibility lead me to read (and blog) about Reactive Streams and I have attempted to implement the design outlined in the Reactive Stream specification. What has bothered me about this approach is that it adds an extra processing step between any two flow components. This is the Subscription that I've described earlier. Adding a step is. of course, going to slow things down - it would be much nicer not to have this extra processing time added on. But, it doesn't appear that this is possible; something has to be monitoring the size of component's mailbox.

As part of my research into solutions to this problem, I spent the last couple of days watching a series of presentations given at the 2013 Erlang conference. The presentations were part of a track entitled: Load Regulation and Back Pressure. Ulf Wiger's 'Jobs' Load Regulation Framework was very interesting. Also, Bryan Fink's Riak Pipe.

What now? Well, I know that there are potential problems, especially if ElixirFBP ever is used in an environment such as real-time monitoring. And, I also know that there are solutions out there. But, I haven't been making much progress in the last few weeks. I think I might be suffering from Analysis Paralysis. As an old boss of mine once told me: "Do a Something!". So I will.