Tuesday, November 15, 2016

GenStage and Bioinformatics

In this post, I will describe my first attempt at using GenStage - in partiuclar, its Flow module - to implement a bioinformatics algorithm. My day job for the last couple of years has been the building of a full-stack system for the storage and searching of the metagenomic analyses (metagenomics) of hydrocarbon resources. That's a mouthful. Basically it means that there is a need to know what types of microorganisms live in oil deposits, for example. Primarily bacterial species, some of these organisms can corrode steel pipes and do other nasty things. Metagenomics begins with the genetic sequencing of thousands of species that are found in these hydrocarbon resources. Because the sequencing results in only pieces of an organism's genome being read - not the whole genome, it becomes a big computational challenge to figure out which species a piece belongs to.

One of the reasons I started this blog was because metagenomics is typically performed by sending what is called the raw read of nucleotide bases (the A, T, U, C, and G's) through a pipeline of processes. A popular pipeline is Qiime (pronounced chime). It consists of many Python programs that are strung together. Initially, I created my own Elixir flow-based framework but, now that GenStage is here, I thought it might be interesting to see how its facilities could be used to perform these types of analyses.

Instead of trying to rewrite Qiime in Elixir - no small task - I determined that I needed to start small and see how I could implement basic bioinformatic algorithms in Elixir and reap the benefits (faster?) of the parallel processing that comes with the use of GenStage. Towards that end, I began taking a great course at Coursera entitled "Finding Hidden Messages in DNA (Bioinformatics I)". Problems in bioinformatic are presented and solutions are outlined but it is up to you to implement the solutions in the language of your choice. One problem is the so-called "Clump Finding Problem". First a definition: a k-mer is a sequence k nucleotides long - for example, CTGGAT is a 6-mer. Given a sequence - this can be millions of nucleotides long - find those k-mers that appear at least some number of times in this sequence. But instead of using the entire sequence, you look for these k-mers in a smaller subsequence. Given an input sequence that is 1500 bases long, we might only be interested in finding k-mers in a 500 base-long subsequence.  Here's an example from the course:

Input: CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA

Parameters: Find the 5-mers that appear at least 4 times in a 50-base long subsequence.

Output: CGACA GAAGA

The algorithm needs to slide along the sequence extracting the subsequence then slide along the subsequence extracting the k-mers keeping count of the unique k-mers. Sliding is performed one base at a time in both cases. Finally, the result will be those k-mers that are found at least a specified number of times.

Why is this algorithm needed? It turns out that there are certain sequences of nucleotides that appear in an organism's genome that are significant. For example, in Bacteria, these sequences might indicate where the replication of their DNA should begin and end.

Here's my first attempt at implementing this algorithm using the Flow module of GenStage:
defmodule Bio do
alias Experimental.Flow

def clump(seq, k_mer_len, subseq_len, times) do
  |> seq
  |> String.to_charlist
  |> Stream.chunk(subseq_len, 1)
  |> Flow.from_enumerable
  |> Flow.partition
  |> Flow.map(fn e -> Stream.chunk(e, k_mer_len, 1) end)
  |> Flow.map(
        fn e ->
          Enum.reduce(e, %{},
            fn w, acc ->
              Map.update(acc, w, 1, & &1 + 1)
            end)
        end)
  |> Flow.flat_map(
        fn e ->
          Enum.reject(e, fn({_, n}) -> n < times end)
        end)
  |> Flow.map(fn({seq, _}) -> seq end)
  |> Enum.uniq
end
end
Although not optimal, the code does work. Looking a little closer, it:
  1. Creates a Flow from the subsequences,
  2. Continues the flow by creating the k-mers, counting their occurrences and storing this count in a Map, and
  3. Removes (rejects) all entries (k-mers) that didn't occur enough times.
  4. Finally, triggers the flow by calling Enum.uniq, a function that eliminates duplicates (we only care that the k-mer appeared at least some number of times, not how many times.) 
Notice my use of Stream.chunk/4. This function is implemented for Enum's and Stream's but not for Flows. Wondering if there needed to be an implementation of the chunk function in Flow, I posted a question on Elixir's mail list. José Valim, Elixir's creator, graciously answered my question and even posted a better implementation of the clump function! (I'll show his solution below). Importantly, he pointed out that you have to be careful in the use of Flow especially when you need to preserve the order of the original sequence. This is because you don't know when any one of the parallel processes will finish and deliver its result. As it turns out, this particular clump finding algorithm doesn't need to preserver the original order of the sequence. Finally, José pointed out that I didn't need to use Flow.partition because the algorithm wasn't reducing over state.

Here's José's implementation:
def clump(seq, k_mer_len, subseq_len, times) do
  seq
  |> String.to_charlist
  |> Stream.chunk(subseq_len, 1)
  |> Flow.from_enumerable
  |> Flow.flat_map(&find_sequences(&1, k_mer_len, times))
  |> Enum.uniq
end
def find_sequences(subseq, k_mer_len, times) do
  subseq
  |> Stream.chunk(k_mer_len, 1)
  |> Enum.reduce(%{}, fn w, acc ->
       Map.update(acc, w, 1, & &1 + 1)
     end)
  |> Enum.reject(fn({_, n}) -> n < times end)
  |> Enum.map(fn({seq, _}) -> seq end)
end