Thursday, August 18, 2016

GenStage Example No. 3 - Dispatching

In my last post, I showed one way for a stage to handle input from more than one source (stage). In this post, we'll look at the other side of the stage: output. Specifically, how can we send the output of one stage to more than one stage based on some type of criteria. As an example, we'll build a stage that can take in integer events and send the odd ones to one stage and the even ones to another stage.

First some background: GenStage has the notion of Dispatchers. These are processes that are responsible for handling both the upstream demand and the downstream events passing between stages. In the simple case, you do not have to specify the dispatcher: the default is the so-called DemandDispatcher. From the GenStage documentation:
GenStage.DemandDispatcher dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering.
GenStage also has a BroadcastDispatcher. From the documentation:
GenStage.BroadcastDispatcher - dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.
To use a dispatcher other than the default, you return a dispatcher option from a stage's init() function:
defmodule Stage1 do
    use GenStage
    def init(state) do
        {:producer_consumer, state, dispatcher: GenStage.BroadcastDispatcher}
    end
    ...
When I first began looking at the problem of splitting output events according to some criteria, I looked at these dispatchers and didn't think that they would do what I needed. I also looked at a new dispatcher (in the latest release - 0.5.0) named PartitionDispatcher and decided that it wasn't applicable, as well. PartitionDispatcher distributes events to a specified number of partitions based on the result of hashing an event. By default, Erlang's phash2/2 is used but you can supply your own function. A likely scenario for this dispatcher would be where you wanted the processing of output events to be handled - evenly - by a group of worker stages.

Thinking that none of these dispatchers fit my needs,  I went ahead and designed and built a new type of dispatcher - I called it a DirectedDisptacher. It expects its input events to be labelled; the label is used to determine which stage to direct the event to. In our example, the even events would appear, for example, as the tuple {:evens, 28} and the odd events would be {:odds, 37}. The labeling of the events is performed by the splitting stage before it reaches the dispatcher. The DirectedDispatcher would know, via earlier subscriptions, the pid of the stage associated with each label and use this pid to send the event to the correct stage.

I'm not going to show my DirectedDispatcher - at least not in its present form - because, after finishing it, I had one of those moments of clarity: I could, in fact, use the PartitionDispatcher, at least for this example. Just because the function that the dispatcher is using is called a hash function doesn't mean that it has to use some hashing algorithm. The responsibility of the "hash" function is to accept an event and the number of partitions and return the event and an integer - the integer indicating which partition to send the event to. Things moved along quickly after this realization. Here's the complete code for the Splitter stage, along with its "hash" function:
defmodule Splitter do
  use GenStage

  def init(_) do
    {:producer_consumer, %{},
      dispatcher: {GenStage.PartitionDispatcher,
                    partitions: 2,
                    hash: &split/2}}
  end

  def split(event, no_of_partitions ) do
    {event, rem(event, no_of_partitions)}
  end
  def handle_events(events, _from, state) do
    {:noreply, events, state}
  end
end
The split function simply computes a partition value based on the value of the event - an integer.

When a stage subscribes to another stage that uses a PartitionDispatcher, such as Splitter above, it needs to specify what partition the subscribing stage will represent. This is done by specifying an extra parameter to GenStage's subscribe functions. For example:
  {:ok, inport}    = GenStage.from_enumerable(1..10)
  {:ok, splitter}  = GenStage.start_link(Splitter, 0)
  {:ok, evens}     = GenStage.start_link(Ticker, {2_000, :evens})
  {:ok, odds}      = GenStage.start_link(Ticker, {2_000, :odds})

  GenStage.sync_subscribe(evens, to: splitter, partition: 0, max_demand: 1)
  GenStage.sync_subscribe(odds,  to: splitter, partition: 1, max_demand: 1)
  GenStage.sync_subscribe(splitter, to: inport, max_demand: 1)
As in earlier examples, the Ticker stage sends an upstream demand and then waits the specified number of milliseconds (2,000) before requesting another event. These tickers are the partition stages. When they subscribe to the Split stage, they indicate what partition number they are to be associated with. The extra initialization parameters - :evens and :odds - are used to distinguish one Ticker from the other.

I realize that am I probably misusing the purpose of the PartitionDispatcher, although the split function above is, very loosely speaking, a hash function.  I am going to revisit the implementation of my DirectedDispatcher. I think it can be modeled after PartitionDispatcher but it would use a "hash" function that accepted the event, and the dispatcher's state to determine which partition to send an event to. I hope to show this code in a future post.

In the meantime, the example outlined above is available on GitHub.


Monday, August 8, 2016

GenStage Example No. 2

In my early experiments with building flows out of Elixir processes, I built flows that had nodes that could take multiple inputs and produce a single output. A very simple example of such a process would be a Sum node that takes in two numbers and produces its sum.

How could this be done using GenStage? The documentation (version 0.4.3) states very clearly that
A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand.
To start, let's define a Constant stage that will produce as many copies of the same value as the demand requires. Below, is what a Constant stage could look like:
  defmodule Constant do
    @moduledoc """
    This stage will always produce demand number of constants.
    """
    use GenStage
    def init(constant) do
      {:producer, constant}
    end
    def handle_demand(demand, constant) when demand > 0 do
      events = List.duplicate(constant, demand)
      {:noreply, events, constant}
    end
  end
Note that it is initialized with a constant value which is stored as its state.  When it receives a demand - when a consumer asks for some events - the function handle_demand will make as many copies as have been asked for. There will be two copies of  Constant stage, each initialized with a different value (integer).

These two Constants will feed a Sum stage. Connecting a stage to multiple inputs is straightforward. For example:
  {:ok, input1} = GenStage.start_link(Stage1,:ok)
  {:ok, input2} = GenStage.start_link(Stage2, :ok)
  {:ok, process} = GenStage.start_link(Process, 0)
  GenStage.sync_subscribe(process, to: input1)
  GenStage.sync_subscribe(process, to: input2)
There is a potential problem with this code, though. In designing the Sum stage I have to account for two distinct sources of data: addends and augends.  Moreover, I want to make sure that I have two inputs available before they are summed. And, I don't want to take two values from one source and sum them. Given these constraints, we need some way of distinguishing the arriving events, that is, from what stage is an event coming?  [Arithmetic is commutative so that it really doesn't matter in what order the inports are used, but it does matter that the inputs are used in pairs - one from each inport.]

It turns out that GenStage creates a unique identifier, termed a tag, for every successful subscription. We need to access to that tag. This can be done by defining the handle_subscribe function for a stage. The GenStage macro supplies default implementations for this function when none are specified. There are two forms of handle_subscribe: one for consumer subscriptions and one for producer subscriptions. We are interested in the producer subscriptions. It's function signature is:
   def handle_subscribe(:producer, opts, {pid, tag}, state) do
In my example, this callback function will be called twice, once for each sync_subscribe to the Sum stage. The pid and tag values will be different on each call. We are going to distinguish one inport from another by supplying an "inport" option to the to the sync_subscribe function. So connecting the two Constant stages to the Sum stage will now look like:
  GenStage.sync_subscribe(sum, to: augend, inport: :augend)
  GenStage.sync_subscribe(sum, to: addend, inport: :addend)
Here's the code for the subscription callback, handle_subscribe, that we now have to provide to handle the extraction of the new option value and associating it with the subscription tag:
    def handle_subscribe(:producer, opts, {_pid, tag}, state) do
      inport = Keyword.get(opts, :inport)
      case inport do
        nil ->
          {:stop, "no inport specified", state}
        _ ->
          new_state = Map.put(state, inport, {tag, []})
          {:automatic, new_state}
      end
    end
Notice that I have taken the unique tag value that GenStage has generated and tied it to the name of the inport using a Map. This map becomes the state of the Sum stage. Also, notice that the other value associated with the tag is an empty list. This is where we will accumulate - queue - input events arriving at the Sum stage.
We are all set up now for the Sum stage to handle input events and to be able to tell which inport they are coming in on. The next bit of code is the implementation of the handle_events callback for the Sum stage:
    def handle_events(events, {_pid, tag}, %{:addend => {tag, _}} = state) do
      state = Map.update!(state, :addend,
          fn({tag, addends}) -> {tag, addends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    end
    def handle_events(events, {_pid, tag}, %{:augend => {tag, _}} = state) do
      state = Map.update!(state, :augend,
            fn({tag, augends}) -> {tag, augends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    end
There is one handle_event function per inport and pattern matching on the saved tag will ensure that we are handling the right events. For each inport, the new events are appended to any prior events that have not been used. The function sum_inports is then called with the updated state. Here is sum_inports:
    defp sum_inports(state) do
      {augend_tag, augends} = Map.get(state, :augend, {nil, []})
      {addend_tag, addends} = Map.get(state, :addend, {nil, []})
      {addends, augends, results} = do_sum(addends, augends, [])
      state = Map.put(state, :addend, {addend_tag, addends})
      state = Map.put(state, :augend, {augend_tag, augends})
      {results, state}
    end
This function simply extracts the queued up events associated with each inport and relies on the function do_sum to perform the arithmetic and return the, possibly updated, event queues. Based on its argument patterns, do_sum figures out what to add:
    defp do_sum([], augends, results) do
      {[], augends, results}
    end
    defp do_sum(addends, [], results) do
      {addends, [], results}
    end
    defp do_sum([h_addends|t_addends], [h_augends|t_augends], results) do
      do_sum(t_addends, t_augends, results ++ [h_addends + h_augends])
    end
The complete code is available on GitHub. I've included instructions for running this flow a couple of different ways. One uses the Constant stages as described above. The other example uses the GenStage function from_enumerable which starts a producer stage from an enumerable or stream.

Enjoy!