Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaces all usages of StreamIdGenerator with MultiWriterIdGenerator #17229

Merged
merged 9 commits into from
May 30, 2024
Next Next commit
Port device lists to use MultiWriterIdGenerator
  • Loading branch information
erikjohnston committed May 29, 2024
commit ba37c5a276e80ac344d3de8fb74aacd38100c71b
54 changes: 30 additions & 24 deletions synapse/storage/databases/main/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@
from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
StreamIdGenerator,
)
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import (
JsonDict,
JsonMapping,
Expand Down Expand Up @@ -99,19 +96,21 @@ def __init__(

# In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process.
self._device_list_id_gen = StreamIdGenerator(
db_conn,
hs.get_replication_notifier(),
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
("device_lists_changes_in_room", "stream_id"),
("device_lists_remote_pending", "stream_id"),
("device_lists_changes_converted_stream_position", "stream_id"),
self._device_list_id_gen = MultiWriterIdGenerator(
db_conn=db_conn,
db=database,
notifier=hs.get_replication_notifier(),
stream_name="device_lists_stream",
instance_name=self._instance_name,
tables=[
("device_lists_stream", "instance_name", "stream_id"),
("user_signature_stream", "instance_name", "stream_id"),
("device_lists_outbound_pokes", "instance_name", "stream_id"),
("device_lists_changes_in_room", "instance_name", "stream_id"),
("device_lists_remote_pending", "instance_name", "stream_id"),
],
is_writer=hs.config.worker.worker_app is None,
sequence_name="device_lists_sequence",
writers=["master"],
)

device_list_max = self._device_list_id_gen.get_current_token()
Expand Down Expand Up @@ -762,6 +761,7 @@ def _add_user_signature_change_txn(
"stream_id": stream_id,
"from_user_id": from_user_id,
"user_ids": json_encoder.encode(user_ids),
"instance_name": self._instance_name,
},
)

Expand Down Expand Up @@ -1582,6 +1582,8 @@ def __init__(
):
super().__init__(database, db_conn, hs)

self._instance_name = hs.get_instance_name()

self.db_pool.updates.register_background_index_update(
"device_lists_stream_idx",
index_name="device_lists_stream_user_id",
Expand Down Expand Up @@ -1694,6 +1696,7 @@ def _txn(txn: LoggingTransaction) -> int:
"device_lists_outbound_pokes",
{
"stream_id": stream_id,
"instance_name": self._instance_name,
"destination": destination,
"user_id": user_id,
"device_id": device_id,
Expand Down Expand Up @@ -1730,10 +1733,6 @@ def _txn(txn: LoggingTransaction) -> int:


class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Because we have write access, this will be a StreamIdGenerator
# (see DeviceWorkerStore.__init__)
_device_list_id_gen: AbstractStreamIdGenerator

def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -2092,9 +2091,9 @@ def _add_device_change_to_stream_txn(
self.db_pool.simple_insert_many_txn(
txn,
table="device_lists_stream",
keys=("stream_id", "user_id", "device_id"),
keys=("instance_name", "stream_id", "user_id", "device_id"),
values=[
(stream_id, user_id, device_id)
(self._instance_name, stream_id, user_id, device_id)
for stream_id, device_id in zip(stream_ids, device_ids)
],
)
Expand Down Expand Up @@ -2124,6 +2123,7 @@ def _add_device_outbound_poke_to_stream_txn(
values = [
(
destination,
self._instance_name,
next(stream_id_iterator),
user_id,
device_id,
Expand All @@ -2139,6 +2139,7 @@ def _add_device_outbound_poke_to_stream_txn(
table="device_lists_outbound_pokes",
keys=(
"destination",
"instance_name",
"stream_id",
"user_id",
"device_id",
Expand All @@ -2157,7 +2158,7 @@ def _add_device_outbound_poke_to_stream_txn(
device_id,
{
stream_id: destination
for (destination, stream_id, _, _, _, _, _) in values
for (destination, _, stream_id, _, _, _, _, _) in values
},
)

Expand Down Expand Up @@ -2210,6 +2211,7 @@ def _add_device_outbound_room_poke_txn(
"device_id",
"room_id",
"stream_id",
"instance_name",
"converted_to_destinations",
"opentracing_context",
),
Expand All @@ -2219,6 +2221,7 @@ def _add_device_outbound_room_poke_txn(
device_id,
room_id,
stream_id,
self._instance_name,
# We only need to calculate outbound pokes for local users
not self.hs.is_mine_id(user_id),
encoded_context,
Expand Down Expand Up @@ -2338,7 +2341,10 @@ async def add_remote_device_list_to_pending(
"user_id": user_id,
"device_id": device_id,
},
values={"stream_id": stream_id},
values={
"stream_id": stream_id,
"instance_name": self._instance_name,
},
desc="add_remote_device_list_to_pending",
)

Expand Down
18 changes: 18 additions & 0 deletions synapse/storage/schema/main/delta/85/02_add_instance_names.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

ALTER TABLE device_lists_stream ADD COLUMN instance_name TEXT;
ALTER TABLE user_signature_stream ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_outbound_pokes ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_changes_in_room ADD COLUMN instance_name TEXT;
ALTER TABLE device_lists_remote_pending ADD COLUMN instance_name TEXT;
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

CREATE SEQUENCE IF NOT EXISTS device_lists_sequence;

-- We need to take the max across all the device lists tables as they share the
-- ID generator
SELECT setval('device_lists_sequence', (
SELECT GREATEST(
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_stream),
(SELECT COALESCE(MAX(stream_id), 1) FROM user_signature_stream),
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_outbound_pokes),
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room),
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_remote_pending),
(SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_converted_stream_position)
)
));