28 Days - pg2 basics - Use process groups for orchestration across a cluster

One of my first major Elixir projects really cared about optimization up-front, due to high throughput. This led to my mentality of “no database”; I would try to always keep data in local heap rather than going to a database. I quickly encountered the biggest challenge with this: how to keep the processes that hold data in sync with each other. I looked at using pg2 for this task, and have been very happy with the outcome.

An example repo is up at https://github.com/sb8244/pg2_demo.

pg2

pg2 has nothing to do with postgres, which is one of the most common thoughts when people see the module name. It creates process groups, so that is where the name comes from.

At the most basic explanation, pg2 allows for a group to be created and then for processes to connect to the group. This leads to a mapping of name -> pid list. The pid list consists of all known processes, whether they be local or remote. When a pg2 group is created, that group becomes visible to all connected nodes in the system. A pg2 group can be created multiple times without error, which means that each node can call create without error.

In practice

I’ll preface this by saying that my particular problem could be solved a number of ways, this is just the way I approached it. Also, pg2 can be used many different ways. If you see any better approach for either the problem or solution, please let me know!

Let’s walk through each section of a module found in the demonstration repo:

defmodule MyWorker.Synchronization do
  use GenServer

  # The topic could simply be __MODULE__, but I like having the human name in it as well
  @topic {:human_name, __MODULE__}

  def start_link do
    GenServer.start_link(__MODULE__, [])
  end

  def topic, do: @topic

pg2 works by grouping processes, so we need a process to group. I satisfy this by creating a process specifically for synchronization purposes. It would be possible to link this to the main process (MyWorker above), but there would be less performance with the serial nature of a process.

The topic is just a tuple or atom, (erlang typespec is “any”), and I generally like to have some human readability in the topic.

  def init([]) do
    :ok = :pg2.create(@topic)
    :ok = :pg2.join(@topic, self())
    {:ok, []}
  end

When the Synchronization GenServer starts, it is going to create the pg2 topic and then join the topic itself. This is possible due to the property pointed out earlier that :pg2.create can be called multiple times successfully.

  def update(some_param) do
    :pg2.get_members(@topic)
    |> Kernel.--(:pg2.get_local_members(@topic))
    |> Enum.each(fn(pid) ->
      send pid, {:broadcast, @topic, {:update_from_db, some_param}}
    end)
  end

This is the public API of the Synchronization module. The process group’s members are retrieved, which is every pid (local and remote) added to the group. In our case, it is only Synchronization module pids. Local processes are removed from this list for performance reasons. In my use case, the data on the local node is already correct; the local node does not need updated.

Each pid is then enumerated and is sent a broadcast message which can actually be any atom or tuple that we like. This can be useful for passing around parameters such as the changed data or tenant information.

  def handle_info({:broadcast, @topic, {:update_from_db, some_param}}, state) do
    MySupervisor.for_param(some_param)
      |> MyWorker.load_from_db()
    {:noreply, state}
  end
end

Finally, the message we passed around is handled. In this short demo, I’m not worrying about sending the changed messages over the wire, instead I am just loading it from the database. This can be desirable for simplicity’s sake if the data isn’t changing often.

A note on distributing data

Keeping multiple copies of the same data in memory, and up to date, across an entire cluster is a pretty hard problem, without going into details of CAP theorem. In this pg2 solution, the data is eventually consistent. This means that some servers may give an incorrect answer over no answer, because they don’t have the most recent data yet.

More stringent handling of the send could be coded if needed, although considering distributed data from the beginning is worth it.


Thanks for reading the 16th post in my 28 days of Elixir. Keep up through the month of February to see if I can stand subjecting myself to 28 days of straight writing. I am looking for new topics to write about, so please reach out if there’s anything you really want to see!

View other posts tagged: engineering elixir 28 days of elixir