Monday, August 8, 2016

GenStage Example No. 2

In my early experiments with building flows out of Elixir processes, I built flows that had nodes that could take multiple inputs and produce a single output. A very simple example of such a process would be a Sum node that takes in two numbers and produces its sum.

How could this be done using GenStage? The documentation (version 0.4.3) states very clearly that
A consumer may have multiple producers and a producer may have multiple consumers. When a consumer asks for data, each producer is handled separately, with its own demand.
To start, let's define a Constant stage that will produce as many copies of the same value as the demand requires. Below, is what a Constant stage could look like:
  defmodule Constant do
    @moduledoc """
    This stage will always produce demand number of constants.
    use GenStage
    def init(constant) do
      {:producer, constant}
    def handle_demand(demand, constant) when demand > 0 do
      events = List.duplicate(constant, demand)
      {:noreply, events, constant}
Note that it is initialized with a constant value which is stored as its state.  When it receives a demand - when a consumer asks for some events - the function handle_demand will make as many copies as have been asked for. There will be two copies of  Constant stage, each initialized with a different value (integer).

These two Constants will feed a Sum stage. Connecting a stage to multiple inputs is straightforward. For example:
  {:ok, input1} = GenStage.start_link(Stage1,:ok)
  {:ok, input2} = GenStage.start_link(Stage2, :ok)
  {:ok, process} = GenStage.start_link(Process, 0)
  GenStage.sync_subscribe(process, to: input1)
  GenStage.sync_subscribe(process, to: input2)
There is a potential problem with this code, though. In designing the Sum stage I have to account for two distinct sources of data: addends and augends.  Moreover, I want to make sure that I have two inputs available before they are summed. And, I don't want to take two values from one source and sum them. Given these constraints, we need some way of distinguishing the arriving events, that is, from what stage is an event coming?  [Arithmetic is commutative so that it really doesn't matter in what order the inports are used, but it does matter that the inputs are used in pairs - one from each inport.]

It turns out that GenStage creates a unique identifier, termed a tag, for every successful subscription. We need to access to that tag. This can be done by defining the handle_subscribe function for a stage. The GenStage macro supplies default implementations for this function when none are specified. There are two forms of handle_subscribe: one for consumer subscriptions and one for producer subscriptions. We are interested in the producer subscriptions. It's function signature is:
   def handle_subscribe(:producer, opts, {pid, tag}, state) do
In my example, this callback function will be called twice, once for each sync_subscribe to the Sum stage. The pid and tag values will be different on each call. We are going to distinguish one inport from another by supplying an "inport" option to the to the sync_subscribe function. So connecting the two Constant stages to the Sum stage will now look like:
  GenStage.sync_subscribe(sum, to: augend, inport: :augend)
  GenStage.sync_subscribe(sum, to: addend, inport: :addend)
Here's the code for the subscription callback, handle_subscribe, that we now have to provide to handle the extraction of the new option value and associating it with the subscription tag:
    def handle_subscribe(:producer, opts, {_pid, tag}, state) do
      inport = Keyword.get(opts, :inport)
      case inport do
        nil ->
          {:stop, "no inport specified", state}
        _ ->
          new_state = Map.put(state, inport, {tag, []})
          {:automatic, new_state}
Notice that I have taken the unique tag value that GenStage has generated and tied it to the name of the inport using a Map. This map becomes the state of the Sum stage. Also, notice that the other value associated with the tag is an empty list. This is where we will accumulate - queue - input events arriving at the Sum stage.
We are all set up now for the Sum stage to handle input events and to be able to tell which inport they are coming in on. The next bit of code is the implementation of the handle_events callback for the Sum stage:
    def handle_events(events, {_pid, tag}, %{:addend => {tag, _}} = state) do
      state = Map.update!(state, :addend,
          fn({tag, addends}) -> {tag, addends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
    def handle_events(events, {_pid, tag}, %{:augend => {tag, _}} = state) do
      state = Map.update!(state, :augend,
            fn({tag, augends}) -> {tag, augends ++ events} end)
      {return_events, new_state} = sum_inports(state)
      {:noreply, return_events, new_state}
There is one handle_event function per inport and pattern matching on the saved tag will ensure that we are handling the right events. For each inport, the new events are appended to any prior events that have not been used. The function sum_inports is then called with the updated state. Here is sum_inports:
    defp sum_inports(state) do
      {augend_tag, augends} = Map.get(state, :augend, {nil, []})
      {addend_tag, addends} = Map.get(state, :addend, {nil, []})
      {addends, augends, results} = do_sum(addends, augends, [])
      state = Map.put(state, :addend, {addend_tag, addends})
      state = Map.put(state, :augend, {augend_tag, augends})
      {results, state}
This function simply extracts the queued up events associated with each inport and relies on the function do_sum to perform the arithmetic and return the, possibly updated, event queues. Based on its argument patterns, do_sum figures out what to add:
    defp do_sum([], augends, results) do
      {[], augends, results}
    defp do_sum(addends, [], results) do
      {addends, [], results}
    defp do_sum([h_addends|t_addends], [h_augends|t_augends], results) do
      do_sum(t_addends, t_augends, results ++ [h_addends + h_augends])
The complete code is available on GitHub. I've included instructions for running this flow a couple of different ways. One uses the Constant stages as described above. The other example uses the GenStage function from_enumerable which starts a producer stage from an enumerable or stream.


No comments:

Post a Comment