Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #6487 from matrix-org/erikj/pass_in_db
Browse files Browse the repository at this point in the history
Pass in Database object to data stores.
  • Loading branch information
erikjohnston authored Dec 9, 2019
2 parents e1544b0 + 65b37f6 commit 30e9adf
Show file tree
Hide file tree
Showing 56 changed files with 242 additions and 244 deletions.
2 changes: 1 addition & 1 deletion .buildkite/postgres-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the script to connect to the postgresql database that will be available in the
# CI's Docker setup at the point where this file is considered.
server_name: "test"
server_name: "localhost:8800"

signing_key_path: "/src/.buildkite/test.signing.key"

Expand Down
2 changes: 1 addition & 1 deletion .buildkite/sqlite-config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Configuration file used for testing the 'synapse_port_db' script.
# Tells the 'update_database' script to connect to the test SQLite database to upgrade its
# schema and run background updates on it.
server_name: "test"
server_name: "localhost:8800"

signing_key_path: "/src/.buildkite/test.signing.key"

Expand Down
1 change: 1 addition & 0 deletions changelog.d/6487.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pass in `Database` object to data stores.
36 changes: 2 additions & 34 deletions scripts/synapse_port_db
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ from synapse.storage.data_stores.main.stats import StatsStore
from synapse.storage.data_stores.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
Expand Down Expand Up @@ -139,39 +140,6 @@ class Store(
UserDirectoryBackgroundUpdateStore,
StatsStore,
):
def __init__(self, db_conn, hs):
super().__init__(db_conn, hs)
self.db_pool = hs.get_db_pool()

@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
i = 0
N = 5
while True:
try:
txn = conn.cursor()
return func(
LoggingTransaction(txn, desc, self.database_engine, [], []),
*args,
**kwargs
)
except self.database_engine.module.DatabaseError as e:
if self.database_engine.is_deadlock(e):
logger.warning("[TXN DEADLOCK] {%s} %d/%d", desc, i, N)
if i < N:
i += 1
conn.rollback()
continue
raise
except Exception as e:
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise

with PreserveLoggingContext():
return (yield self.db_pool.runWithConnection(r))

def execute(self, f, *args, **kwargs):
return self.db.runInteraction(f.__name__, f, *args, **kwargs)

Expand Down Expand Up @@ -512,7 +480,7 @@ class Porter(object):

hs = MockHomeserver(self.hs_config, engine, conn, db_pool)

store = Store(conn, hs)
store = Store(Database(hs), conn, hs)

yield store.db.runInteraction(
"%s_engine.check_database" % config["name"], engine.check_database,
Expand Down
5 changes: 3 additions & 2 deletions synapse/app/federation_sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
from synapse.util.async_helpers import Linearizer
Expand All @@ -59,8 +60,8 @@ class FederationSenderSlaveStore(
SlavedDeviceStore,
SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(database, db_conn, hs)

# We pull out the current federation stream position now so that we
# always have a known value for the federation position in memory so
Expand Down
35 changes: 6 additions & 29 deletions synapse/app/homeserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage import DataStore
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
Expand Down Expand Up @@ -294,22 +294,6 @@ def start_listening(self, listeners):
else:
logger.warning("Unrecognized listener type: %s", listener["type"])

def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain(
db_conn.cursor(), database_engine, self.hostname
)
if not all_users_native:
quit_with_error(
"Found users in database not native to %s!\n"
"You cannot changed a synapse server_name after it's been configured"
% (self.hostname,)
)

try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))


# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
Expand Down Expand Up @@ -357,16 +341,12 @@ def setup(config_options):

synapse.config.logger.setup_logging(hs, config, use_worker_options=False)

logger.info("Preparing database: %s...", config.database_config["name"])
logger.info("Setting up server")

try:
with hs.get_db_conn(run_new_connection=False) as db_conn:
prepare_database(db_conn, database_engine, config=config)
database_engine.on_new_connection(db_conn)

hs.run_startup_checks(db_conn, database_engine)

db_conn.commit()
hs.setup()
except IncorrectDatabaseSetup as e:
quit_with_error(str(e))
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
Expand All @@ -375,9 +355,6 @@ def setup(config_options):
)
sys.exit(1)

logger.info("Database prepared in %s.", config.database_config["name"])

hs.setup()
hs.setup_master()

@defer.inlineCallbacks
Expand Down
5 changes: 3 additions & 2 deletions synapse/app/user_dir.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.storage.database import Database
from synapse.storage.engines import create_engine
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
Expand All @@ -60,8 +61,8 @@ class UserDirectorySlaveStore(
UserDirectoryStore,
BaseSlavedStore,
):
def __init__(self, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(UserDirectorySlaveStore, self).__init__(database, db_conn, hs)

events_max = self._stream_id_gen.get_current_token()
curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from synapse.storage._base import SQLBaseStore
from synapse.storage.data_stores.main.cache import CURRENT_STATE_CACHE_NAME
from synapse.storage.database import Database
from synapse.storage.engines import PostgresEngine

from ._slaved_id_tracker import SlavedIdTracker
Expand All @@ -35,8 +36,8 @@ def __func__(inp):


class BaseSlavedStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(BaseSlavedStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(BaseSlavedStore, self).__init__(database, db_conn, hs)
if isinstance(self.database_engine, PostgresEngine):
self._cache_id_gen = SlavedIdTracker(
db_conn, "cache_invalidation_stream", "stream_id"
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/account_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage.data_stores.main.account_data import AccountDataWorkerStore
from synapse.storage.data_stores.main.tags import TagsWorkerStore
from synapse.storage.database import Database


class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
def __init__(self, database: Database, db_conn, hs):
self._account_data_id_gen = SlavedIdTracker(
db_conn, "account_data_max_stream_id", "stream_id"
)

super(SlavedAccountDataStore, self).__init__(db_conn, hs)
super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)

def get_max_account_data_stream_id(self):
return self._account_data_id_gen.get_current_token()
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# limitations under the License.

from synapse.storage.data_stores.main.client_ips import LAST_SEEN_GRANULARITY
from synapse.storage.database import Database
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.caches.descriptors import Cache

from ._base import BaseSlavedStore


class SlavedClientIpStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedClientIpStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedClientIpStore, self).__init__(database, db_conn, hs)

self.client_ip_last_seen = Cache(
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.storage.data_stores.main.deviceinbox import DeviceInboxWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceInboxStore(DeviceInboxWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedDeviceInboxStore, self).__init__(database, db_conn, hs)
self._device_inbox_id_gen = SlavedIdTracker(
db_conn, "device_max_stream_id", "stream_id"
)
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
from synapse.replication.tcp.streams._base import DeviceListsStream, UserSignatureStream
from synapse.storage.data_stores.main.devices import DeviceWorkerStore
from synapse.storage.data_stores.main.end_to_end_keys import EndToEndKeyWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache


class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedDeviceStore, self).__init__(database, db_conn, hs)

self.hs = hs

Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from synapse.storage.data_stores.main.state import StateGroupWorkerStore
from synapse.storage.data_stores.main.stream import StreamWorkerStore
from synapse.storage.data_stores.main.user_erasure_store import UserErasureWorkerStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
Expand Down Expand Up @@ -59,13 +60,13 @@ class SlavedEventStore(
RelationsWorkerStore,
BaseSlavedStore,
):
def __init__(self, db_conn, hs):
def __init__(self, database: Database, db_conn, hs):
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)

super(SlavedEventStore, self).__init__(db_conn, hs)
super(SlavedEventStore, self).__init__(database, db_conn, hs)

# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/filtering.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
# limitations under the License.

from synapse.storage.data_stores.main.filtering import FilteringStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore


class SlavedFilteringStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedFilteringStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedFilteringStore, self).__init__(database, db_conn, hs)

# Filters are immutable so this cache doesn't need to be expired
get_user_filter = FilteringStore.__dict__["get_user_filter"]
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@
# limitations under the License.

from synapse.storage import DataStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker


class SlavedGroupServerStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(database, db_conn, hs)

self.hs = hs

Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@

from synapse.storage import DataStore
from synapse.storage.data_stores.main.presence import PresenceStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache

from ._base import BaseSlavedStore, __func__
from ._slaved_id_tracker import SlavedIdTracker


class SlavedPresenceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPresenceStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedPresenceStore, self).__init__(database, db_conn, hs)
self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")

self._presence_on_startup = self._get_active_presence(db_conn)
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/push_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
# limitations under the License.

from synapse.storage.data_stores.main.push_rule import PushRulesWorkerStore
from synapse.storage.database import Database

from ._slaved_id_tracker import SlavedIdTracker
from .events import SlavedEventStore


class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
def __init__(self, db_conn, hs):
def __init__(self, database: Database, db_conn, hs):
self._push_rules_stream_id_gen = SlavedIdTracker(
db_conn, "push_rules_stream", "stream_id"
)
super(SlavedPushRuleStore, self).__init__(db_conn, hs)
super(SlavedPushRuleStore, self).__init__(database, db_conn, hs)

def get_push_rules_stream_token(self):
return (
Expand Down
5 changes: 3 additions & 2 deletions synapse/replication/slave/storage/pushers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
# limitations under the License.

from synapse.storage.data_stores.main.pusher import PusherWorkerStore
from synapse.storage.database import Database

from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker


class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedPusherStore, self).__init__(db_conn, hs)
def __init__(self, database: Database, db_conn, hs):
super(SlavedPusherStore, self).__init__(database, db_conn, hs)
self._pushers_id_gen = SlavedIdTracker(
db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
)
Expand Down
Loading

0 comments on commit 30e9adf

Please sign in to comment.