Skip to content

Commit

Permalink
Add an optional :offload send mode (#10)
Browse files Browse the repository at this point in the history
This optional mode offloads the send/2 calls to the remote Manifold nodes
to one of the local `Manifold.Sender` processes.

Add a new test for the :offload send mode.
  • Loading branch information
slam committed Sep 7, 2022
1 parent 8fbb527 commit 7db6d24
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 21 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ were finally able to keep up with their message queues.

![Packets Out Reduction](priv/packets.png)

There is an optional and experimental `:offload` send mode which offloads on the send side the `send/2` calls to the receiving
nodes to a pool of `Manifold.Sender` processes. To maintain the linearizability guaranteed by `send/2`, the same calling process
always offloads the work to the same `Manifold.Sender` process. The size of the `Manifold.Sender` pool is configurable. This send
mode is optional because its benefits are workload dependent. For some workloads, it might degrade overall performance. Use with
caution.

Caution: To maintain the linearizability guaranteed by `send/2`, do not mix calls to Manifold with and without offloading. Mixed
use of the two different send modes to the same set of receiving nodes would break the linearizability guarantee.

## Usage

Add it to `mix.exs`
Expand All @@ -42,6 +51,19 @@ Manifold.send(self(), :hello)
Manifold.send([self(), self()], :hello)
```

To use the experimental `:offload` send mode, make sure the `Manifold.Sender` pool size is appropriate for the
workload:

```elixir
config :manifold, senders: <size>
```

Then:

```elixir
Manifold.send(self(), :hello, send_mode: :offload)
```

### Configuration
Manifold takes a single configuration option, which sets the module it dispatches to actually call send. The default
is GenServer. To set this variable, add the following to your `config.exs`:
Expand Down
104 changes: 87 additions & 17 deletions lib/manifold.ex
Original file line number Diff line number Diff line change
@@ -1,42 +1,80 @@
defmodule Manifold do
use Application

alias Manifold.{Partitioner, Utils}
alias Manifold.Partitioner
alias Manifold.Sender
alias Manifold.Utils

@max_partitioners 32
@partitioners min(Application.get_env(:manifold, :partitioners, 1), @max_partitioners)
@workers_per_partitioner Application.get_env(:manifold, :workers_per_partitioner, System.schedulers_online)

@max_senders 128
@senders min(Application.get_env(:manifold, :senders, System.schedulers_online), @max_senders)

## OTP

def start(_type, _args) do
import Supervisor.Spec, warn: false

children = for partitioner_id <- 0..(@partitioners - 1) do
Partitioner.child_spec(@workers_per_partitioner, [name: partitioner_for(partitioner_id)])
end
partitioners =
for partitioner_id <- 0..(@partitioners - 1) do
Partitioner.child_spec(@workers_per_partitioner, name: partitioner_for(partitioner_id))
end

Supervisor.start_link children,
senders =
for sender_id <- 0..(@senders - 1) do
Sender.child_spec(name: sender_for(sender_id))
end

Supervisor.start_link(partitioners ++ senders,
strategy: :one_for_one,
max_restarts: 10,
name: __MODULE__.Supervisor
)
end

## Client

@spec send([pid | nil] | pid | nil, term) :: :ok
def send([pid], message), do: __MODULE__.send(pid, message)
def send(pids, message) when is_list(pids) do
partitioner_name = current_partitioner()
grouped_by = Utils.group_by(pids, fn
nil -> nil
pid -> node(pid)
end)
for {node, pids} <- grouped_by, node != nil, do: Partitioner.send({partitioner_name, node}, pids, message)
:ok
@spec send([pid | nil] | pid | nil, term, send_mode: :offload) :: :ok
def send(pid, message, options \\ [])
def send([pid], message, options), do: __MODULE__.send(pid, message, options)

def send(pids, message, options) when is_list(pids) do
case options[:send_mode] do
:offload ->
Sender.send(current_sender(), current_partitioner(), pids, message)

nil ->
partitioner_name = current_partitioner()

grouped_by =
Utils.group_by(pids, fn
nil -> nil
pid -> node(pid)
end)

for {node, pids} <- grouped_by,
node != nil,
do: Partitioner.send({partitioner_name, node}, pids, message)

:ok
end
end

def send(pid, message, options) when is_pid(pid) do
case options[:send_mode] do
:offload ->
# To maintain linearizability guaranteed by send/2, we have to send
# it to the sender process, even for a single receiving pid.
Sender.send(current_sender(), current_partitioner(), [pid], message)

nil ->
Partitioner.send({current_partitioner(), node(pid)}, [pid], message)
end
end
def send(pid, message) when is_pid(pid), do: Partitioner.send({current_partitioner(), node(pid)}, [pid], message)
def send(nil, _message), do: :ok

def send(nil, _message, _options), do: :ok

def set_partitioner_key(key) do
partitioner = key
Expand Down Expand Up @@ -70,4 +108,36 @@ defmodule Manifold do
unquote(:"Manifold.Partitioner_#{partitioner_id}")
end
end

def set_sender_key(key) do
sender =
key
|> Utils.hash()
|> rem(@senders)
|> sender_for()

Process.put(:manifold_sender, sender)
end

def current_sender() do
case Process.get(:manifold_sender) do
nil ->
sender_for(self())

sender ->
sender
end
end

def sender_for(pid) when is_pid(pid) do
pid
|> Utils.partition_for(@senders)
|> sender_for
end

for sender_id <- 0..(@max_senders - 1) do
def sender_for(unquote(sender_id)) do
unquote(:"Manifold.Sender_#{sender_id}")
end
end
end
65 changes: 65 additions & 0 deletions lib/manifold/sender.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
defmodule Manifold.Sender do
use GenServer

alias Manifold.Utils

@gen_module Application.get_env(:manifold, :gen_module, GenServer)

## Client

@spec child_spec(Keyword.t()) :: tuple
def child_spec(opts \\ []) do
import Supervisor.Spec, warn: false
supervisor(__MODULE__, [:ok, opts], id: Keyword.get(opts, :name, __MODULE__))
end

@spec start_link(:ok, Keyword.t()) :: GenServer.on_start()
def start_link(:ok, opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end

@spec send(pid, atom, [pid], term) :: :ok
def send(pid, partitioner, pids, message) do
@gen_module.cast(pid, {:send, partitioner, pids, message})
end

## Server Callbacks

def init(:ok) do
# Set optimal process flags
Process.flag(:message_queue_data, :off_heap)
schedule_next_hibernate()
{:ok, nil}
end

def handle_cast({:send, partitioner, pids, message}, nil) do
grouped_by =
Utils.group_by(pids, fn
nil -> nil
pid -> node(pid)
end)

for {node, pids} <- grouped_by, node != nil do
Manifold.Partitioner.send({partitioner, node}, pids, message)
end

{:noreply, nil}
end

def handle_cast(_message, nil) do
{:noreply, nil}
end

def handle_info(:hibernate, nil) do
schedule_next_hibernate()
{:noreply, nil, :hibernate}
end

def handle_info(_message, nil) do
{:noreply, nil}
end

defp schedule_next_hibernate() do
Process.send_after(self(), :hibernate, Utils.next_hibernate_delay())
end
end
11 changes: 8 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ defmodule Manifold.Mixfile do
def project do
[
app: :manifold,
version: "1.4.0",
version: "1.5.0",
elixir: "~> 1.5",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps(),
package: package()
package: package(),
# https://github.com/whitfin/local-cluster#setup
aliases: [
test: "test --no-start"
]
]
end

Expand All @@ -22,7 +26,8 @@ defmodule Manifold.Mixfile do

defp deps do
[
{:benchfella, "~> 0.3.0", only: :test}
{:benchfella, "~> 0.3.0", only: :test},
{:local_cluster, "~> 1.2", only: [:test]}
]
end

Expand Down
5 changes: 5 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
%{
"benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"},
"global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"},
"local_cluster": {:hex, :local_cluster, "1.2.1", "8eab3b8a387680f0872eacfb1a8bd5a91cb1d4d61256eec6a655b07ac7030c73", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "aae80c9bc92c911cb0be085fdeea2a9f5b88f81b6bec2ff1fec244bb0acc232c"},
}
97 changes: 96 additions & 1 deletion test/manifold_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule ManifoldTest do
end
Manifold.send(pids, message)
for pid <- pids do
assert_receive {^pid, ^message}
assert_receive {^pid, ^message}, 1000
end
end

Expand Down Expand Up @@ -79,4 +79,99 @@ defmodule ManifoldTest do
assert_receive ^message
assert_receive ^message
end

test "many pids using :offload" do
[receiver] =
LocalCluster.start_nodes(:manifold, 1,
files: [
__ENV__.file
]
)

me = self()
message = {:hello, me}

pids =
for _ <- 0..10000 do
Node.spawn_link(receiver, fn ->
receive do
{:hello, sender} -> send(sender, {self(), {:hello, sender}})
end
end)
end

Manifold.send(pids, message, send_mode: :offload)

for pid <- pids do
assert_receive {^pid, ^message}, 1000
end
end

defmacro assert_next_receive(pattern, timeout \\ 100) do
quote do
receive do
message ->
assert unquote(pattern) = message
after
unquote(timeout) ->
raise "timeout"
end
end
end

test "send/2 linearization guarantees with :offload" do
[receiver] =
LocalCluster.start_nodes(:manifold, 1,
files: [
__ENV__.file
]
)

# Set up several receiving pids, but only the first pid echos
# the message back to the sender...
pids =
for n <- 0..2 do
loop =
if n == 0 do
fn f ->
receive do
{:hello, sender, n} ->
send(sender, {self(), {:hello, sender, n}})
f.(f)
end
end
else
fn f ->
receive do
{:hello, _sender, _n} ->
f.(f)
end
end
end

Node.spawn_link(receiver, fn -> loop.(loop) end)
end

me = self()
[pid | _] = pids

# Fire off a bunch of messages, with some sent only to the
# first receiving pid, while others sent to all pids.
for n <- 0..1000 do
message = {:hello, me, n}

if rem(n, 2) == 0 do
Manifold.send(pid, message, send_mode: :offload)
else
Manifold.send(pids, message, send_mode: :offload)
end
end

# Expect the messages to be echoed back from the first
# receiving pid in order.
for n <- 0..1000 do
message = {:hello, me, n}
assert_next_receive({^pid, ^message}, 1000)
end
end
end
4 changes: 4 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
# https://github.com/whitfin/local-cluster#setup
:ok = LocalCluster.start()
Application.ensure_all_started(:manifold)

ExUnit.start()

0 comments on commit 7db6d24

Please sign in to comment.