Tuesday, November 15, 2016

GenStage and Bioinformatics

In this post, I will describe my first attempt at using GenStage - in partiuclar, its Flow module - to implement a bioinformatics algorithm. My day job for the last couple of years has been the building of a full-stack system for the storage and searching of the metagenomic analyses (metagenomics) of hydrocarbon resources. That's a mouthful. Basically it means that there is a need to know what types of microorganisms live in oil deposits, for example. Primarily bacterial species, some of these organisms can corrode steel pipes and do other nasty things. Metagenomics begins with the genetic sequencing of thousands of species that are found in these hydrocarbon resources. Because the sequencing results in only pieces of an organism's genome being read - not the whole genome, it becomes a big computational challenge to figure out which species a piece belongs to.

One of the reasons I started this blog was because metagenomics is typically performed by sending what is called the raw read of nucleotide bases (the A, T, U, C, and G's) through a pipeline of processes. A popular pipeline is Qiime (pronounced chime). It consists of many Python programs that are strung together. Initially, I created my own Elixir flow-based framework but, now that GenStage is here, I thought it might be interesting to see how its facilities could be used to perform these types of analyses.

Instead of trying to rewrite Qiime in Elixir - no small task - I determined that I needed to start small and see how I could implement basic bioinformatic algorithms in Elixir and reap the benefits (faster?) of the parallel processing that comes with the use of GenStage. Towards that end, I began taking a great course at Coursera entitled "Finding Hidden Messages in DNA (Bioinformatics I)". Problems in bioinformatic are presented and solutions are outlined but it is up to you to implement the solutions in the language of your choice. One problem is the so-called "Clump Finding Problem". First a definition: a k-mer is a sequence k nucleotides long - for example, CTGGAT is a 6-mer. Given a sequence - this can be millions of nucleotides long - find those k-mers that appear at least some number of times in this sequence. But instead of using the entire sequence, you look for these k-mers in a smaller subsequence. Given an input sequence that is 1500 bases long, we might only be interested in finding k-mers in a 500 base-long subsequence.  Here's an example from the course:


Parameters: Find the 5-mers that appear at least 4 times in a 50-base long subsequence.


The algorithm needs to slide along the sequence extracting the subsequence then slide along the subsequence extracting the k-mers keeping count of the unique k-mers. Sliding is performed one base at a time in both cases. Finally, the result will be those k-mers that are found at least a specified number of times.

Why is this algorithm needed? It turns out that there are certain sequences of nucleotides that appear in an organism's genome that are significant. For example, in Bacteria, these sequences might indicate where the replication of their DNA should begin and end.

Here's my first attempt at implementing this algorithm using the Flow module of GenStage:
defmodule Bio do
alias Experimental.Flow

def clump(seq, k_mer_len, subseq_len, times) do
  |> seq
  |> String.to_charlist
  |> Stream.chunk(subseq_len, 1)
  |> Flow.from_enumerable
  |> Flow.partition
  |> Flow.map(fn e -> Stream.chunk(e, k_mer_len, 1) end)
  |> Flow.map(
        fn e ->
          Enum.reduce(e, %{},
            fn w, acc ->
              Map.update(acc, w, 1, & &1 + 1)
  |> Flow.flat_map(
        fn e ->
          Enum.reject(e, fn({_, n}) -> n < times end)
  |> Flow.map(fn({seq, _}) -> seq end)
  |> Enum.uniq
Although not optimal, the code does work. Looking a little closer, it:
  1. Creates a Flow from the subsequences,
  2. Continues the flow by creating the k-mers, counting their occurrences and storing this count in a Map, and
  3. Removes (rejects) all entries (k-mers) that didn't occur enough times.
  4. Finally, triggers the flow by calling Enum.uniq, a function that eliminates duplicates (we only care that the k-mer appeared at least some number of times, not how many times.) 
Notice my use of Stream.chunk/4. This function is implemented for Enum's and Stream's but not for Flows. Wondering if there needed to be an implementation of the chunk function in Flow, I posted a question on Elixir's mail list. José Valim, Elixir's creator, graciously answered my question and even posted a better implementation of the clump function! (I'll show his solution below). Importantly, he pointed out that you have to be careful in the use of Flow especially when you need to preserve the order of the original sequence. This is because you don't know when any one of the parallel processes will finish and deliver its result. As it turns out, this particular clump finding algorithm doesn't need to preserver the original order of the sequence. Finally, José pointed out that I didn't need to use Flow.partition because the algorithm wasn't reducing over state.

Here's José's implementation:
def clump(seq, k_mer_len, subseq_len, times) do
  |> String.to_charlist
  |> Stream.chunk(subseq_len, 1)
  |> Flow.from_enumerable
  |> Flow.flat_map(&find_sequences(&1, k_mer_len, times))
  |> Enum.uniq
def find_sequences(subseq, k_mer_len, times) do
  |> Stream.chunk(k_mer_len, 1)
  |> Enum.reduce(%{}, fn w, acc ->
       Map.update(acc, w, 1, & &1 + 1)
  |> Enum.reject(fn({_, n}) -> n < times end)
  |> Enum.map(fn({seq, _}) -> seq end)

Thursday, August 18, 2016

GenStage Example No. 3 - Dispatching

In my last post, I showed one way for a stage to handle input from more than one source (stage). In this post, we'll look at the other side of the stage: output. Specifically, how can we send the output of one stage to more than one stage based on some type of criteria. As an example, we'll build a stage that can take in integer events and send the odd ones to one stage and the even ones to another stage.

First some background: GenStage has the notion of Dispatchers. These are processes that are responsible for handling both the upstream demand and the downstream events passing between stages. In the simple case, you do not have to specify the dispatcher: the default is the so-called DemandDispatcher. From the GenStage documentation:
GenStage.DemandDispatcher dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering.
GenStage also has a BroadcastDispatcher. From the documentation:
GenStage.BroadcastDispatcher - dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.
To use a dispatcher other than the default, you return a dispatcher option from a stage's init() function:
defmodule Stage1 do
    use GenStage
    def init(state) do
        {:producer_consumer, state, dispatcher: GenStage.BroadcastDispatcher}
When I first began looking at the problem of splitting output events according to some criteria, I looked at these dispatchers and didn't think that they would do what I needed. I also looked at a new dispatcher (in the latest release - 0.5.0) named PartitionDispatcher and decided that it wasn't applicable, as well. PartitionDispatcher distributes events to a specified number of partitions based on the result of hashing an event. By default, Erlang's phash2/2 is used but you can supply your own function. A likely scenario for this dispatcher would be where you wanted the processing of output events to be handled - evenly - by a group of worker stages.

Thinking that none of these dispatchers fit my needs,  I went ahead and designed and built a new type of dispatcher - I called it a DirectedDisptacher. It expects its input events to be labelled; the label is used to determine which stage to direct the event to. In our example, the even events would appear, for example, as the tuple {:evens, 28} and the odd events would be {:odds, 37}. The labeling of the events is performed by the splitting stage before it reaches the dispatcher. The DirectedDispatcher would know, via earlier subscriptions, the pid of the stage associated with each label and use this pid to send the event to the correct stage.

I'm not going to show my DirectedDispatcher - at least not in its present form - because, after finishing it, I had one of those moments of clarity: I could, in fact, use the PartitionDispatcher, at least for this example. Just because the function that the dispatcher is using is called a hash function doesn't mean that it has to use some hashing algorithm. The responsibility of the "hash" function is to accept an event and the number of partitions and return the event and an integer - the integer indicating which partition to send the event to. Things moved along quickly after this realization. Here's the complete code for the Splitter stage, along with its "hash" function:
defmodule Splitter do
  use GenStage

  def init(_) do
    {:producer_consumer, %{},
      dispatcher: {GenStage.PartitionDispatcher,
                    partitions: 2,
                    hash: &split/2}}

  def split(event, no_of_partitions ) do
    {event, rem(event, no_of_partitions)}
  def handle_events(events, _from, state) do
    {:noreply, events, state}
The split function simply computes a partition value based on the value of the event - an integer.

When a stage subscribes to another stage that uses a PartitionDispatcher, such as Splitter above, it needs to specify what partition the subscribing stage will represent. This is done by specifying an extra parameter to GenStage's subscribe functions. For example:
  {:ok, inport}    = GenStage.from_enumerable(1..10)
  {:ok, splitter}  = GenStage.start_link(Splitter, 0)
  {:ok, evens}     = GenStage.start_link(Ticker, {2_000, :evens})
  {:ok, odds}      = GenStage.start_link(Ticker, {2_000, :odds})

  GenStage.sync_subscribe(evens, to: splitter, partition: 0, max_demand: 1)
  GenStage.sync_subscribe(odds,  to: splitter, partition: 1, max_demand: 1)
  GenStage.sync_subscribe(splitter, to: inport, max_demand: 1)
As in earlier examples, the Ticker stage sends an upstream demand and then waits the specified number of milliseconds (2,000) before requesting another event. These tickers are the partition stages. When they subscribe to the Split stage, they indicate what partition number they are to be associated with. The extra initialization parameters - :evens and :odds - are used to distinguish one Ticker from the other.

I realize that am I probably misusing the purpose of the PartitionDispatcher, although the split function above is, very loosely speaking, a hash function.  I am going to revisit the implementation of my DirectedDispatcher. I think it can be modeled after PartitionDispatcher but it would use a "hash" function that accepted the event, and the dispatcher's state to determine which partition to send an event to. I hope to show this code in a future post.

In the meantime, the example outlined above is available on GitHub.

Monday, August 8, 2016

GenStage Example No. 2

In my early experiments with building flows out of Elixir processes, I built flows that had nodes that could take multiple inputs and produce a single output. A very simple example of such a process would be a Sum node that takes in two numbers and produces its sum.

How could this be done using GenStage? The documentation (version 0.4.3) states very clearly that
A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand.
To start, let's define a Constant stage that will produce as many copies of the same value as the demand requires. Below, is what a Constant stage could look like:
  defmodule Constant do
    @moduledoc """
    This stage will always produce demand number of constants.
    use GenStage
    def init(constant) do
      {:producer, constant}
    def handle_demand(demand, constant) when demand > 0 do
      events = List.duplicate(constant, demand)
      {:noreply, events, constant}
Note that it is initialized with a constant value which is stored as its state.  When it receives a demand - when a consumer asks for some events - the function handle_demand will make as many copies as have been asked for. There will be two copies of  Constant stage, each initialized with a different value (integer).

These two Constants will feed a Sum stage. Connecting a stage to multiple inputs is straightforward. For example:
  {:ok, input1} = GenStage.start_link(Stage1,:ok)
  {:ok, input2} = GenStage.start_link(Stage2, :ok)
  {:ok, process} = GenStage.start_link(Process, 0)
  GenStage.sync_subscribe(process, to: input1)
  GenStage.sync_subscribe(process, to: input2)
There is a potential problem with this code, though. In designing the Sum stage I have to account for two distinct sources of data: addends and augends.  Moreover, I want to make sure that I have two inputs available before they are summed. And, I don't want to take two values from one source and sum them. Given these constraints, we need some way of distinguishing the arriving events, that is, from what stage is an event coming?  [Arithmetic is commutative so that it really doesn't matter in what order the inports are used, but it does matter that the inputs are used in pairs - one from each inport.]

It turns out that GenStage creates a unique identifier, termed a tag, for every successful subscription. We need to access to that tag. This can be done by defining the handle_subscribe function for a stage. The GenStage macro supplies default implementations for this function when none are specified. There are two forms of handle_subscribe: one for consumer subscriptions and one for producer subscriptions. We are interested in the producer subscriptions. It's function signature is:
   def handle_subscribe(:producer, opts, {pid, tag}, state) do
In my example, this callback function will be called twice, once for each sync_subscribe to the Sum stage. The pid and tag values will be different on each call. We are going to distinguish one inport from another by supplying an "inport" option to the to the sync_subscribe function. So connecting the two Constant stages to the Sum stage will now look like:
  GenStage.sync_subscribe(sum, to: augend, inport: :augend)
  GenStage.sync_subscribe(sum, to: addend, inport: :addend)
Here's the code for the subscription callback, handle_subscribe, that we now have to provide to handle the extraction of the new option value and associating it with the subscription tag:
    def handle_subscribe(:producer, opts, {_pid, tag}, state) do
      inport = Keyword.get(opts, :inport)
      case inport do
        nil ->
          {:stop, "no inport specified", state}
        _ ->
          new_state = Map.put(state, inport, {tag, []})
          {:automatic, new_state}
Notice that I have taken the unique tag value that GenStage has generated and tied it to the name of the inport using a Map. This map becomes the state of the Sum stage. Also, notice that the other value associated with the tag is an empty list. This is where we will accumulate - queue - input events arriving at the Sum stage.
We are all set up now for the Sum stage to handle input events and to be able to tell which inport they are coming in on. The next bit of code is the implementation of the handle_events callback for the Sum stage:
    def handle_events(events, {_pid, tag}, %{:addend => {tag, _}} = state) do
      state = Map.update!(state, :addend,
          fn({tag, addends}) -> {tag, addends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    def handle_events(events, {_pid, tag}, %{:augend => {tag, _}} = state) do
      state = Map.update!(state, :augend,
            fn({tag, augends}) -> {tag, augends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
There is one handle_event function per inport and pattern matching on the saved tag will ensure that we are handling the right events. For each inport, the new events are appended to any prior events that have not been used. The function sum_inports is then called with the updated state. Here is sum_inports:
    defp sum_inports(state) do
      {augend_tag, augends} = Map.get(state, :augend, {nil, []})
      {addend_tag, addends} = Map.get(state, :addend, {nil, []})
      {addends, augends, results} = do_sum(addends, augends, [])
      state = Map.put(state, :addend, {addend_tag, addends})
      state = Map.put(state, :augend, {augend_tag, augends})
      {results, state}
This function simply extracts the queued up events associated with each inport and relies on the function do_sum to perform the arithmetic and return the, possibly updated, event queues. Based on its argument patterns, do_sum figures out what to add:
    defp do_sum([], augends, results) do
      {[], augends, results}
    defp do_sum(addends, [], results) do
      {addends, [], results}
    defp do_sum([h_addends|t_addends], [h_augends|t_augends], results) do
      do_sum(t_addends, t_augends, results ++ [h_addends + h_augends])
The complete code is available on GitHub. I've included instructions for running this flow a couple of different ways. One uses the Constant stages as described above. The other example uses the GenStage function from_enumerable which starts a producer stage from an enumerable or stream.


Wednesday, July 20, 2016

GenStage Example

Way back in December I wrote about a new Elixir project named GenRouter. It was a first attempt at defining a reactive streaming library for Elixir. It has come a long way since then! On July 14th, José Valim posted a blog entry announcing GenStage - the succesor to GenRouter.  I encourage you to read the post and watch the accompanying video as well. The "stage" in GenStage refers to the name given to the nodes in a flow; every stage is an Elixir/Erlang process thereby providing the means to parallelize flows. Stages can be of three types: consumers, producer/consumers and producers.  The data that move through a flow are called events - events can be of any type. There is much, much more to GenStage. Take a look at the excellent online documentation as well.

Just over a year ago, I wrote  about a graphical tool, named Streamtools, developed by the New York Times R&D group (it does not appear to be in active development as of this date; here's a link to description of the tool and the example.). I took one of their examples and implemented it with my ElixirFBP library. The example accesses the data feed of a bike share program in New York City. Below is an implementation of that example using GenStage (the code and instructions for running this example are available at GitHub):
alias Experimental.GenStage

defmodule GenstageExample do
  defmodule Map do
    use GenStage
    def init(url) do
      {:producer, url}
    def handle_demand(demand, url) when demand > 0 do
      events = List.duplicate(url, demand)
      {:noreply, events, url}

  defmodule GetHTTP do
    use GenStage
    def init(_) do
      {:producer_consumer, :ok}
    def handle_events(events, _from, _state) do
      events = Enum.map(events, & retrieve_as_json &1)
      {:noreply, events, :ok}
    defp retrieve_as_json(url) do
      {:ok, %HTTPoison.Response{body: body}} = HTTPoison.get(url)

  defmodule Unpack do
    use GenStage
    def init(element) do
      {:producer_consumer, element}
    def handle_events(events, _from, element) do
      events = Enum.map(events, & &1[element])
      {:noreply, events, element}

  defmodule Filter do
    use GenStage
    def init({filter, filter_value}) do
      {:producer_consumer, {filter, filter_value}}
    def handle_events(events, _from, {filter, filter_value} = state) do
      events = Enum.map(events,
                           fn stations ->
                             Enum.filter(stations, & &1[filter] == filter_value)
      {:noreply, events, state}
  defmodule Ticker do
    use GenStage
    def init(sleeping_time) do
      {:consumer, sleeping_time}
    def handle_events(events, _from, sleeping_time) do
      {:noreply, [], sleeping_time}

  # Start up each stage with initial values.
  {:ok, map}     = GenStage.start_link(Map, 
  {:ok, getHTTP} = GenStage.start_link(GetHTTP, :ok)
  {:ok, unpack}  = GenStage.start_link(Unpack, "stationBeanList")
  {:ok, filter}  = GenStage.start_link(Filter, {"stationName", "W 14 St & The High Line"})
  {:ok, ticker}  = GenStage.start_link(Ticker, 5_000)

  # Join (subscribe) stages and create the data (event) flow
  GenStage.sync_subscribe(ticker, to: filter)
  GenStage.sync_subscribe(filter, to: unpack)
  GenStage.sync_subscribe(unpack, to: getHTTP)
  GenStage.sync_subscribe(getHTTP, to: map, max_demand: 1)


There are five stages/nodes in this data flow:
  1. Map - supply a url but only if there is demand for it.
  2. GetHTTP - retrieve web data, converting the json to an Elixir KeyMap
  3. Unpack - extract an element from the keymap
  4. Filter - find a particular bike station by its name
  5. Ticker - Send a single event (demand) every 5 seconds
My original implementation of this data flow operated in push mode. That is, the first node of the flow sent a signal to the second node in order to get things started. GenStage flows operate in a pull or demand driven mode so this Ticker node is placed at the end of the flow. It notifies the stage that it is connected to that it wants - at most - one event. This demand is passed to all of the other stages until it arrives at the only pure producer in the flow: Map. All other stages are producer/consumers. They respond to - consume - events by producing new ones.