Skip to content

Commit

Permalink
Add option to compress data when copying tables between Mnesia nodes
Browse files Browse the repository at this point in the history
Optionally using data compression for copying Mnesia tables allows
the system to be tuned to eliminate network bottlenecks from
deployments where enough CPU is available to use zlib.

With this patch, running erl -mnesia send_compressed <level>
will enable compression for sending tables between nodes. The
compression level can be any integer in [0, 9], with 0 (the
default) meaning no compression (exactly the previous behaviour)
and 9 being the highest compression level.

To set any non-zero compression level at the sender, both nodes
must have the updated Mnesia modules (the receiver will work
according to the sender's configuration).
  • Loading branch information
igorrs authored and bjorng committed Feb 2, 2010
1 parent 36c0041 commit 3757d8f
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 5 deletions.
11 changes: 11 additions & 0 deletions lib/mnesia/doc/src/mnesia.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3044,6 +3044,17 @@ raise(Name, Amount) ->
The default value is <c>2</c>.
</p>
</item>
<item>
<p><c>-mnesia send_compressed Level</c> specifies the level of
compression to be used when copying a table from the local node to
another one. The default level is 0.
</p>
<p><c>Level</c> must be an integer in the interval [0, 9], with 0
representing no compression and 9 representing maximum compression.
Before setting it to a non-zero value, make sure the remote nodes
understand this configuration.
</p>
</item>
<item>
<p><c>-mnesia schema_location Loc</c> controls where
Mnesia will look for its schema. The parameter
Expand Down
2 changes: 2 additions & 0 deletions lib/mnesia/src/mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2200,6 +2200,7 @@ system_info2(transaction_log_writes) -> mnesia_dumper:get_log_writes();
system_info2(core_dir) -> mnesia_monitor:get_env(core_dir);
system_info2(no_table_loaders) -> mnesia_monitor:get_env(no_table_loaders);
system_info2(dc_dump_limit) -> mnesia_monitor:get_env(dc_dump_limit);
system_info2(send_compressed) -> mnesia_monitor:get_env(send_compressed);

system_info2(Item) -> exit({badarg, Item}).

Expand Down Expand Up @@ -2244,6 +2245,7 @@ system_info_items(yes) ->
core_dir,
no_table_loaders,
dc_dump_limit,
send_compressed,
version
];
system_info_items(no) ->
Expand Down
35 changes: 33 additions & 2 deletions lib/mnesia/src/mnesia_loader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,9 @@ make_table_fun(Pid, TabRec) ->

get_data(Pid, TabRec) ->
receive
{Pid, {more_z, CompressedRecs}} when is_binary(CompressedRecs) ->
Pid ! {TabRec, more},
{zlib_uncompress(CompressedRecs), make_table_fun(Pid,TabRec)};
{Pid, {more, Recs}} ->
Pid ! {TabRec, more},
{Recs, make_table_fun(Pid,TabRec)};
Expand Down Expand Up @@ -769,6 +772,27 @@ dets_bchunk(Tab, Chunk) -> %% Arrg
Else -> Else
end.

zlib_compress(Data, Level) ->
BinData = term_to_binary(Data),
Z = zlib:open(),
zlib:deflateInit(Z, Level),
Bs = zlib:deflate(Z, BinData, finish),
zlib:deflateEnd(Z),
zlib:close(Z),
list_to_binary(Bs).

zlib_uncompress(Data) when is_binary(Data) ->
binary_to_term(zlib:uncompress(Data)).

compression_level() ->
NoCompression = 0,
case ?catch_val(send_compressed) of
{'EXIT', _} ->
mnesia_lib:set(send_compressed, NoCompression),
NoCompression;
Val -> Val
end.

send_packet(N, Pid, _Chunk, '$end_of_table', OldNode) ->
case OldNode of
true -> ignore; %% Old nodes can't handle the new no_more
Expand All @@ -779,8 +803,15 @@ send_packet(N, Pid, Chunk, {[], Cont}, OldNode) ->
send_packet(N, Pid, Chunk, Chunk(Cont), OldNode);
send_packet(N, Pid, Chunk, {Recs, Cont}, OldNode) when N < ?MAX_NOPACKETS ->
case OldNode of
true -> Pid ! {self(), {more, [Recs]}}; %% Old need's wrapping list
false -> Pid ! {self(), {more, Recs}}
true ->
Pid ! {self(), {more, [Recs]}}; %% Old need's wrapping list
false ->
case compression_level() of
0 ->
Pid ! {self(), {more, Recs}};
Level ->
Pid ! {self(), {more_z, zlib_compress(Recs, Level)}}
end
end,
send_packet(N+1, Pid, Chunk, Chunk(Cont), OldNode);
send_packet(_N, _Pid, _Chunk, DataState, _OldNode) ->
Expand Down
10 changes: 7 additions & 3 deletions lib/mnesia/src/mnesia_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,8 @@ env() ->
core_dir,
pid_sort_order,
no_table_loaders,
dc_dump_limit
dc_dump_limit,
send_compressed
].

default_env(access_module) ->
Expand Down Expand Up @@ -717,7 +718,9 @@ default_env(pid_sort_order) ->
default_env(no_table_loaders) ->
2;
default_env(dc_dump_limit) ->
4.
4;
default_env(send_compressed) ->
0.

check_type(Env, Val) ->
case catch do_check_type(Env, Val) of
Expand Down Expand Up @@ -763,7 +766,8 @@ do_check_type(pid_sort_order, standard) -> standard;
do_check_type(pid_sort_order, "standard") -> standard;
do_check_type(pid_sort_order, _) -> false;
do_check_type(no_table_loaders, N) when is_integer(N), N > 0 -> N;
do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N.
do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N;
do_check_type(send_compressed, L) when is_integer(L), L >= 0, L =< 9 -> L.

bool(true) -> true;
bool(false) -> false.
Expand Down

0 comments on commit 3757d8f

Please sign in to comment.