Thursday, August 18, 2016

GenStage Example No. 3 - Dispatching

In my last post, I showed one way for a stage to handle input from more than one source (stage). In this post, we'll look at the other side of the stage: output. Specifically, how can we send the output of one stage to more than one stage based on some type of criteria. As an example, we'll build a stage that can take in integer events and send the odd ones to one stage and the even ones to another stage.

First some background: GenStage has the notion of Dispatchers. These are processes that are responsible for handling both the upstream demand and the downstream events passing between stages. In the simple case, you do not have to specify the dispatcher: the default is the so-called DemandDispatcher. From the GenStage documentation:
GenStage.DemandDispatcher dispatches the given batch of events to the consumer with the biggest demand in a FIFO ordering.
GenStage also has a BroadcastDispatcher. From the documentation:
GenStage.BroadcastDispatcher - dispatches all events to all consumers. The demand is only sent upstream once all consumers ask for data.
To use a dispatcher other than the default, you return a dispatcher option from a stage's init() function:
defmodule Stage1 do
    use GenStage
    def init(state) do
        {:producer_consumer, state, dispatcher: GenStage.BroadcastDispatcher}
When I first began looking at the problem of splitting output events according to some criteria, I looked at these dispatchers and didn't think that they would do what I needed. I also looked at a new dispatcher (in the latest release - 0.5.0) named PartitionDispatcher and decided that it wasn't applicable, as well. PartitionDispatcher distributes events to a specified number of partitions based on the result of hashing an event. By default, Erlang's phash2/2 is used but you can supply your own function. A likely scenario for this dispatcher would be where you wanted the processing of output events to be handled - evenly - by a group of worker stages.

Thinking that none of these dispatchers fit my needs,  I went ahead and designed and built a new type of dispatcher - I called it a DirectedDisptacher. It expects its input events to be labelled; the label is used to determine which stage to direct the event to. In our example, the even events would appear, for example, as the tuple {:evens, 28} and the odd events would be {:odds, 37}. The labeling of the events is performed by the splitting stage before it reaches the dispatcher. The DirectedDispatcher would know, via earlier subscriptions, the pid of the stage associated with each label and use this pid to send the event to the correct stage.

I'm not going to show my DirectedDispatcher - at least not in its present form - because, after finishing it, I had one of those moments of clarity: I could, in fact, use the PartitionDispatcher, at least for this example. Just because the function that the dispatcher is using is called a hash function doesn't mean that it has to use some hashing algorithm. The responsibility of the "hash" function is to accept an event and the number of partitions and return the event and an integer - the integer indicating which partition to send the event to. Things moved along quickly after this realization. Here's the complete code for the Splitter stage, along with its "hash" function:
defmodule Splitter do
  use GenStage

  def init(_) do
    {:producer_consumer, %{},
      dispatcher: {GenStage.PartitionDispatcher,
                    partitions: 2,
                    hash: &split/2}}

  def split(event, no_of_partitions ) do
    {event, rem(event, no_of_partitions)}
  def handle_events(events, _from, state) do
    {:noreply, events, state}
The split function simply computes a partition value based on the value of the event - an integer.

When a stage subscribes to another stage that uses a PartitionDispatcher, such as Splitter above, it needs to specify what partition the subscribing stage will represent. This is done by specifying an extra parameter to GenStage's subscribe functions. For example:
  {:ok, inport}    = GenStage.from_enumerable(1..10)
  {:ok, splitter}  = GenStage.start_link(Splitter, 0)
  {:ok, evens}     = GenStage.start_link(Ticker, {2_000, :evens})
  {:ok, odds}      = GenStage.start_link(Ticker, {2_000, :odds})

  GenStage.sync_subscribe(evens, to: splitter, partition: 0, max_demand: 1)
  GenStage.sync_subscribe(odds,  to: splitter, partition: 1, max_demand: 1)
  GenStage.sync_subscribe(splitter, to: inport, max_demand: 1)
As in earlier examples, the Ticker stage sends an upstream demand and then waits the specified number of milliseconds (2,000) before requesting another event. These tickers are the partition stages. When they subscribe to the Split stage, they indicate what partition number they are to be associated with. The extra initialization parameters - :evens and :odds - are used to distinguish one Ticker from the other.

I realize that am I probably misusing the purpose of the PartitionDispatcher, although the split function above is, very loosely speaking, a hash function.  I am going to revisit the implementation of my DirectedDispatcher. I think it can be modeled after PartitionDispatcher but it would use a "hash" function that accepted the event, and the dispatcher's state to determine which partition to send an event to. I hope to show this code in a future post.

In the meantime, the example outlined above is available on GitHub.


  1. Another great article, thank you for writing about GenStage!

    I would just like to add that I don't think you are misusing the partition dispatcher. I would say that converting a label to a number is a form of hashing and that's a totally valid use case of partitioning. Finally, I have added the PartitionDispatcher when working on Flow, so those interested can also check it out if they are looking for other examples and use cases for partitioning:

  2. Peter, yet again, brilliant article. Love the way you described your thought process when switching from DirectedDispatcher to PartitionDispatcher. Please keep up the great work.

    I have a couple of questions if you have a minute:
    - Do you maybe have any plans to cover "Experimental.Flow.Window" that was added in gen_stage 0.5 in future posts?

    - I'm very curious what's the idiomatic approach to make "producer" respond to time-based events, i.e. checking storage or 3rd party every X seconds/minutes? Because right now, what I'm doing is - if there's demand, but not "supply" -> schedule a check that will be done after certain time period, which then changes state of Producer and data flows to consumers. But I'm not sure if it's the "right" way to accomplish this task in Elixir.

    Thank you!

    1. Thank you, Alex.

      I hope to study Flow.Window relatively soon.

      Reactive streams are demand-driven. As you may have noticed in my examples I usually have a "ticker" that periodically sends demand upstream and, if there are events available, they'll be sent downstream. This implementation of the ticker always sends a demand but I can imagine a case where you poll an external source and only then, would you have the stage send demand upstream.

      Whether that's idiomatic or not, I'm not sure. It's still early in GenStage's development where idiom's are being developed. Maybe you could also look at other reactive stream examples, such as those that use Akka, for techniques.

  3. Thanks for this example, Peter. For anyone trying this with the latest version of GenStage as of the end of October (0.8.0) - you'll need to make one change due to a refactor José made in (init expects the hash keyword to have only one arg now) :

    defmodule Splitter do
    def initundefined_) do
    {:producer_consumer, %{},
    dispatcher: {GenStage.PartitionDispatcher,
    partitions: 2,
    hash: &splitundefined&1, 2)