Wednesday, July 20, 2016

GenStage Example

Way back in December I wrote about a new Elixir project named GenRouter. It was a first attempt at defining a reactive streaming library for Elixir. It has come a long way since then! On July 14th, José Valim posted a blog entry announcing GenStage - the succesor to GenRouter.  I encourage you to read the post and watch the accompanying video as well. The "stage" in GenStage refers to the name given to the nodes in a flow; every stage is an Elixir/Erlang process thereby providing the means to parallelize flows. Stages can be of three types: consumers, producer/consumers and producers.  The data that move through a flow are called events - events can be of any type. There is much, much more to GenStage. Take a look at the excellent online documentation as well.

Just over a year ago, I wrote  about a graphical tool, named Streamtools, developed by the New York Times R&D group (it does not appear to be in active development as of this date; here's a link to description of the tool and the example.). I took one of their examples and implemented it with my ElixirFBP library. The example accesses the data feed of a bike share program in New York City. Below is an implementation of that example using GenStage (the code and instructions for running this example are available at GitHub):
alias Experimental.GenStage

defmodule GenstageExample do
  defmodule Map do
    use GenStage
    def init(url) do
      {:producer, url}
    end
    def handle_demand(demand, url) when demand > 0 do
      events = List.duplicate(url, demand)
      {:noreply, events, url}
    end
  end

  defmodule GetHTTP do
    use GenStage
    def init(_) do
      {:producer_consumer, :ok}
    end
    def handle_events(events, _from, _state) do
      events = Enum.map(events, & retrieve_as_json &1)
      {:noreply, events, :ok}
    end
    defp retrieve_as_json(url) do
      {:ok, %HTTPoison.Response{body: body}} = HTTPoison.get(url)
      Poison.Parser.parse!(body)
    end
  end

  defmodule Unpack do
    use GenStage
    def init(element) do
      {:producer_consumer, element}
    end
    def handle_events(events, _from, element) do
      events = Enum.map(events, & &1[element])
      {:noreply, events, element}
    end
  end

  defmodule Filter do
    use GenStage
    def init({filter, filter_value}) do
      {:producer_consumer, {filter, filter_value}}
    end
    def handle_events(events, _from, {filter, filter_value} = state) do
      events = Enum.map(events,
                           fn stations ->
                             Enum.filter(stations, & &1[filter] == filter_value)
                           end)
      {:noreply, events, state}
    end
  end
  defmodule Ticker do
    use GenStage
    def init(sleeping_time) do
      {:consumer, sleeping_time}
    end
    def handle_events(events, _from, sleeping_time) do
      IO.inspect(events)
      Process.sleep(sleeping_time)
      {:noreply, [], sleeping_time}
    end
  end

  # Start up each stage with initial values.
  {:ok, map}     = GenStage.start_link(Map, 
                                   "http://feeds.citibikenyc.com/stations/stations.json")
  {:ok, getHTTP} = GenStage.start_link(GetHTTP, :ok)
  {:ok, unpack}  = GenStage.start_link(Unpack, "stationBeanList")
  {:ok, filter}  = GenStage.start_link(Filter, {"stationName", "W 14 St & The High Line"})
  {:ok, ticker}  = GenStage.start_link(Ticker, 5_000)

  # Join (subscribe) stages and create the data (event) flow
  GenStage.sync_subscribe(ticker, to: filter)
  GenStage.sync_subscribe(filter, to: unpack)
  GenStage.sync_subscribe(unpack, to: getHTTP)
  GenStage.sync_subscribe(getHTTP, to: map, max_demand: 1)

  Process.sleep(:infinity)
end

There are five stages/nodes in this data flow:
  1. Map - supply a url but only if there is demand for it.
  2. GetHTTP - retrieve web data, converting the json to an Elixir KeyMap
  3. Unpack - extract an element from the keymap
  4. Filter - find a particular bike station by its name
  5. Ticker - Send a single event (demand) every 5 seconds
My original implementation of this data flow operated in push mode. That is, the first node of the flow sent a signal to the second node in order to get things started. GenStage flows operate in a pull or demand driven mode so this Ticker node is placed at the end of the flow. It notifies the stage that it is connected to that it wants - at most - one event. This demand is passed to all of the other stages until it arrives at the only pure producer in the flow: Map. All other stages are producer/consumers. They respond to - consume - events by producing new ones.

6 comments:

  1. Thanks so much for the example :). I'm curious, what are your thoughts on Go Channels. In many ways it seems like they solve the same problem but are flexible. Maybe I'm wrong though. I kind of wish there was an easier way to assemble these pipelines.

    ReplyDelete
  2. Hello Gage,

    I don't know Go well enough to comment on its use in pipelines. @smlmp has written about using Go at his blog and here.

    Peter

    ReplyDelete
    Replies
    1. Whoops! Links in the last message did not copy - Samuels blogs: http://bionics.it/ and http://saml.rilspace.org/

      Delete
  3. A very good example about using genstage. Thank you very much Peter.

    ReplyDelete
  4. Fantastic example.
    I have a quick question: How do you make "Tick" be periodically triggered and query producer let's say every minute?

    Thank you,
    Alex

    ReplyDelete
  5. Thank you, Alex.
    Once the Ticker is started and subscribes to another stage it waits on events arriving at its handle_events function. After doing something with the event or events - I just print it out - the Process.sleep function is invoked. This causes the Ticker process to pause that many milliseconds. When processing resumes, it returns from the handle_events function. The Ticker stage does not ask for any new events until it returns from this function. Note that Ticker is initialized with the amount of time to sleep; this becomes the stages' state

    ReplyDelete