Projects Blog About Contact

Prototyping Vivaldi, a simple distributed algorithm in Elixir

July 23, 2017

When learning Elixir using the Programming Elixir book, I came across an excellent Papers We Love talk by Armon Dadgar of Hashicorp on the Vivaldi algorithm. I thought Elixir would be a great fit to implement the algorithm.


What is Vivaldi?

Vivaldi was developed by Frank Dabek and Russ Cox at MIT. It is a decentralized algorithm which predicts round trip times between nodes in a cluster by assigning co-ordinates to each node.

Why is it needed?

From the paper:

Large-scale Internet applications can benefit from an ability to predict round-trip times to other hosts without having to contact them first. Explicit measurements are often unattractive because the cost of measurement can outweigh the benefits of exploiting proximity information. Vivaldi is a simple, light-weight algorithm that assigns synthetic coordinates to hosts such that the distance between the coordinates of two hosts accurately predicts the communication latency between the hosts.

A modified version of Vivaldi is used by Hashicorp's Serf, which in-turn is used in Consul, a system used for service discovery and configuration. Vivaldi is also used in Vuze's BitTorrent client.


How does it work?

The paper and the talk are very accessible, and I can't do a better job! But, if you don't have the time, here's a quick simplified explanation:

Vivaldi's goal is to assign Euclidean coordinates to each node in a cluster so that these coordinates accurately(within a 10% error) predict the round-trip-time(RTT) between the nodes.

Each node starts at the origin. When a node A makes a request to another node B, B responds with the payload and also the latest version of its coordinates. A uses the difference between the real RTT, and the RTT predicted by A and B's coordinates to calculate the next version of its coordinates. When another node C communicates with A, A now responds with its updated coordinates.

This process repeats itself throughout the cluster, and all the nodes converge to coordinates which not only predict RTTs to nodes they've communicated with, but also the RTTs to the remaining nodes! In my simulations, I found that in clusters without triangle-inequality violations, each node converged after communicating once with 10 - 15% of its peers.

The authors found that Euclidean coordinates alone aren't good enough to model the Internet, so they added a height factor, to accommodate nodes that have high latencies to reach the Internet backbone.


Intuition

We'll build our intuition on how the co-ordinates are updated in 3 steps.

1. Forces in Action

Consider the above cluster with four nodes. The leftmost node, A, is new, whereas the others nodes, B, C and D are stable, i.e. their coordinates have already been assigned and the distance between each other is also equal to the latency between each other.

Assume that the latency from A to all the other nodes is equal, and that its desired position is at the centre of the triangle formed among the other nodes.

Now each of B, C and D exerts a force on A towards itself. The magnitude of each force is proportional to the difference between the real latency and the latency predicted by the co-ordinates. A calculates the resultant force — the vector-sum of all these forces — and takes a small step in the direction of the resultant force. This process repeats, and A moves towards the centre of the triangle.


2. Centralized Algorithm

In the previous case, we only had to worry about one new node. In this case, all nodes are new and start at the origin. In the first iteration, each node takes a step in a random direction and continues to follow the same principle as explained in the previous case and the nodes converge after a few iterations.


3. Distributed Algorithm

In the previous two cases, each node communicated with all the other nodes before taking a step in the right direction(You can think of it as batched gradient descent). In this case, each node takes a step after communicating with a single node (as in stochastic gradient descent) and the algorithm continues to work!

In real-world scenarios, we often find all nodes don't join the cluster at the same point in time. Hence, in the above visualization, nodes are added to the cluster in 6 stages.

You can see that nodes move more aggresively when they're new and the error rates are high. Once a node is stable enough, it isn't affected much by the presence of newer nodes.

(The height factor is not visualized since it is very small compared to the Euclidean coordinates)


Implementing Vivaldi in Elixir

You can find the code on GitHub. I'll highlight how certain features of Elixir and Erlang/OTP helped prototype the algorithm quickly.

Process Discovery

There was no impedance-mismatch between local mode and distributed mode. In local mode, all the peers ran on my development machine. And in distributed node, the peers ran in different machines. I didn't have to write any low level networking code, or serialization/deserialization code.

Once I got the algorithm working on my machine, I spun up a bunch of nodes on AWS, Digital Ocean and Google Cloud Platform to test the algorithm and spent very little time debugging networking issues.

Each peer runs the following processes.

1. Ping Client          :node-<id>-ping-client
2. Ping Server          :node-<id>-ping-server
3. Coordinate           :node-<id>-coordinate
4. Coordinate Stash     :node-<id>-coordinate-stash
5. Connections          :node-<id>-connections
6. Coordinator          :node-<id>-coordinator
7. Supervisor           :node-<id>-supervisor

PingClient - Periodically pings a random peer. When it receives a response, it forwards the peer's coordinates and the latency information to the Coordinate process.

Coordinate - Updates the peer's coordinates using Vivaldi, and stores the values in the CoordinateStash process.

PingServer - Responds to a ping with the coordinates stored in CoordinateStash.

CoordinateStash - Stores the latest version of the peer's coordinate.

Connections - Helps connect to another peer's PingServer (more on this later).

Coordinator - Helps configure the peer during simulations.

Supervisor - Supervises all the above processes. So, for example, if the PingClient process goes down because another peer's PingServer does not respond within a timeout, the supervisor just restarts the PingClient, and I don't have to handle this error manually in the PingClient.

I assigned unique names to each process, where each process's name is prefixed by the node_id. The processes communicate with each other using these names instead of explicitly using the pid. In development mode, all the peers run on a single node, whereas in the distributed mode, each peer runs on a different node.

Only PingServer needs to be visible across the network. So I made each PingServer's pid globally visible using :global.register_name

defmodule PingServer do
  use GenServer

  def init([{node_id, session_id}]) do
    node_id
    |> get_name()
    |> :global.register_name(self)
    {:ok, {node_id, session_id}}
  end

  def get_server_pid(node_id) do
    node_id
    |> get_name()
    |> :global.whereis_name()
  end

  defp get_name(node_id) do
    :"#{node_id}-ping-server"
  end

end

The Connections module helps discover PingServer pids in distributed mode, It first connects to the peer, and then uses :global.whereis_name, which relies on the underlying Erlang/OTP system to return the PingServer pid.

defmodule Connections do
  use GenServer

  def handle_call({:get_peer_ping_server_pid, peer_id}, _, config) do
    case config[:local_mode?] do
    true ->
      get_local_ping_server_pid(peer_id, config)
    false ->
      get_remote_ping_server_pid(peer_id, config)
    end
  end

  def get_local_ping_server_pid(peer_id, config) do
    PingServer.get_server_pid(peer_id)
  end

  def get_remote_ping_server_pid(peer_id, config) do
    peer_name = config[:peer_names][peer_id]
    if peer_name in Node.list do
      PingServer.get_server_pid(peer_id)
    else
      case Node.connect(peer_name) do
        true ->
          :timer.sleep(500)
          PingServer.get_server_pid(peer_id)
        _ ->
          Logger.error("Can not connect to #{peer_name}")
      end
    end
  end
end

If you're a careful reader, you might have noticed how I slipped in a :timer.sleep after Node.connect. I added it as a quick hack since :global.whereis_name returned :undefined without the timeout. Sarat(@sarat) explained to me that the issue occurs because the OTP does takes a small amount of time to propagate global names across the cluster.


Running Simulations

The default values recommend by Vivaldi and Serf are excellent. But you can tune these values based on your cluster. So I used an additional Controller node which sends commands to configure peers.

Elixir's pipes came in really handy in modelling setup operations.

defmodule Controller

    def run(peers, base_config) do
      peers
      |> connect()
      |> check_status(expected_status=:not_started)
      |> generate_peer_configs(base_config)
      |> send_command(:configure)
      |> check_status(expected_status=:just_started)
      |> send_command(:get_ready)
      |> check_status(expected_status=:ready)
      |> send_command(:begin_pings)
    end

end

Here the controller first connects to each peer, then checks if each peer is up, and then generates configuration information which contains the node_ids and IP addresses, and once again checks if the configuration is successful and so on.

(The Controller code on GitHub is not yet as simple, since this was the last part of the project, and I just wanted to get the system working at this point!)

Pattern Matching and State Machines

Each peer runs a Coordinator process which receives commands from the Controller. I modelled the Coordinator as a simple state machine. If you've ever done any UI programming, you'll have written code to prevent users from clicking a button continuously and performing their action more than once. Elixir (and Erlang's) pattern matching provides this feature for free.

defmodule Coordinator do

  def handle_call({:configure, config}, _, {:not_started, node_id}) do
    # application logic
    {:reply, :ok, {:just_started, [x, y, error]}}
  end

  def handle_call({:begin_pings}, _, {:just_started, {node_id}) do
    # application logic
    {:reply, :ok, {:pinging, [x, y, error]}}
  end

  def handle_call(:get_status, _, {status, {node_id}}) do
    {:reply, {:ok, status}, {status, {node_id}}}
  end

end

The Coordinator accepts the :configure command only when it is in the :not_started state, and then moves to the :just_started state. Similarly, it only accepts the :begin_pings command when it is in the :just_started state, and then moves to the :pinging state.


Tooling

Elixir has excellent tooling, considering it's a young language and ecosystem.

I faced issues in setting names and cookies to nodes dynamically. So I asked a StackOverflow question and quickly received an answer. I still had to resort to a couple of hacks to get the system working. But this was expected considering this was my first Elixir project, and mainly because I'm not familiar with Erlang's ecosystem.

Overall, I understood the appeal of Erlang/OTP in building distributed, soft real-time systems. This was also the first time I've used a functional programming language, and Elixir eased the learning curve. Praveen(@praveen) explained how we don't have to use complex concepts in order to be productive with Elixir, and it resonated with me as well. I'm now looking forward to using Elixir in my upcoming projects!