Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Use execute_values more in PostgreSQL #10754

Merged
merged 4 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/10754.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Minor speed ups when joining large rooms over federation.
61 changes: 42 additions & 19 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,18 +280,18 @@ def execute_batch(self, sql: str, args: Iterable[Iterable[Any]]) -> None:
else:
self.executemany(sql, args)

def execute_values(self, sql: str, *args: Any) -> List[Tuple]:
def execute_values(self, sql: str, *args: Any, fetch: bool = True) -> List[Tuple]:
"""Corresponds to psycopg2.extras.execute_values. Only available when
using postgres.

Always sets fetch=True when caling `execute_values`, so will return the
results.
The `fetch` parameter must be set to False if the query does not return
rows (e.g. INSERTs).
"""
assert isinstance(self.database_engine, PostgresEngine)
from psycopg2.extras import execute_values # type: ignore

return self._do_execute(
lambda *x: execute_values(self.txn, *x, fetch=True), sql, *args
lambda *x: execute_values(self.txn, *x, fetch=fetch), sql, *args
)

def execute(self, sql: str, *args: Any) -> None:
Expand Down Expand Up @@ -920,13 +920,23 @@ def simple_insert_many_txn(
if k != keys[0]:
raise RuntimeError("All items must have the same keys")

sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0]),
)
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` for postgres as it can be a lot faster
# than `execute_batch`, but it's only available on postgres.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
sql = "INSERT INTO %s (%s) VALUES ?" % (
table,
", ".join(k for k in keys[0]),
)

txn.execute_batch(sql, vals)
txn.execute_values(sql, vals, fetch=False)
else:
sql = "INSERT INTO %s (%s) VALUES(%s)" % (
table,
", ".join(k for k in keys[0]),
", ".join("?" for _ in keys[0]),
)

txn.execute_batch(sql, vals)

async def simple_upsert(
self,
Expand Down Expand Up @@ -1281,20 +1291,33 @@ def simple_upsert_many_txn_native_upsert(
k + "=EXCLUDED." + k for k in value_names
)

sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join("?" for _ in allnames),
", ".join(key_names),
latter,
)

args = []

for x, y in zip(key_values, value_values):
args.append(tuple(x) + tuple(y))

return txn.execute_batch(sql, args)
if isinstance(txn.database_engine, PostgresEngine):
# We use `execute_values` for postgres as it can be a lot faster
# than `execute_batch`, but it's only available on postgres.
sql = "INSERT INTO %s (%s) VALUES ? ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join(key_names),
latter,
)

txn.execute_values(sql, args, fetch=False)

else:
clokep marked this conversation as resolved.
Show resolved Hide resolved
sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s" % (
table,
", ".join(k for k in allnames),
", ".join("?" for _ in allnames),
", ".join(key_names),
latter,
)

return txn.execute_batch(sql, args)

@overload
async def simple_select_one(
Expand Down