-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Optimise _update_client_ips_batch_txn
to batch together database operations.
#12252
Changes from all commits
7cf8b2e
c7cfbf5
45c98e0
0646b4b
1acd8cd
1e961ad
503f5c8
5f29099
e7c4907
5675a94
e7985d2
7edf6f7
9556aae
303fba6
ac4b1d5
34403cb
0f8e98b
481b730
99e6b66
3f7f659
50f2b91
93e1237
c0aaec4
c456958
214aacf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Move `update_client_ip` background job from the main process to the background worker. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1268,6 +1268,7 @@ async def simple_upsert_many( | |
value_names: Collection[str], | ||
value_values: Collection[Collection[Any]], | ||
desc: str, | ||
lock: bool = True, | ||
) -> None: | ||
""" | ||
Upsert, many times. | ||
|
@@ -1279,21 +1280,24 @@ async def simple_upsert_many( | |
value_names: The value column names | ||
value_values: A list of each row's value column values. | ||
Ignored if value_names is empty. | ||
lock: True to lock the table when doing the upsert. Unused if the database engine | ||
supports native upserts. | ||
""" | ||
|
||
# We can autocommit if we are going to use native upserts | ||
autocommit = ( | ||
self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables | ||
) | ||
|
||
return await self.runInteraction( | ||
await self.runInteraction( | ||
desc, | ||
self.simple_upsert_many_txn, | ||
table, | ||
key_names, | ||
key_values, | ||
value_names, | ||
value_values, | ||
lock=lock, | ||
db_autocommit=autocommit, | ||
) | ||
|
||
|
@@ -1305,6 +1309,7 @@ def simple_upsert_many_txn( | |
key_values: Collection[Iterable[Any]], | ||
value_names: Collection[str], | ||
value_values: Iterable[Iterable[Any]], | ||
lock: bool = True, | ||
) -> None: | ||
""" | ||
Upsert, many times. | ||
|
@@ -1316,14 +1321,16 @@ def simple_upsert_many_txn( | |
value_names: The value column names | ||
value_values: A list of each row's value column values. | ||
Ignored if value_names is empty. | ||
lock: True to lock the table when doing the upsert. Unused if the database engine | ||
supports native upserts. | ||
""" | ||
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables: | ||
return self.simple_upsert_many_txn_native_upsert( | ||
txn, table, key_names, key_values, value_names, value_values | ||
) | ||
else: | ||
return self.simple_upsert_many_txn_emulated( | ||
txn, table, key_names, key_values, value_names, value_values | ||
txn, table, key_names, key_values, value_names, value_values, lock=lock | ||
) | ||
|
||
def simple_upsert_many_txn_emulated( | ||
|
@@ -1334,6 +1341,7 @@ def simple_upsert_many_txn_emulated( | |
key_values: Collection[Iterable[Any]], | ||
value_names: Collection[str], | ||
value_values: Iterable[Iterable[Any]], | ||
lock: bool = True, | ||
) -> None: | ||
""" | ||
Upsert, many times, but without native UPSERT support or batching. | ||
|
@@ -1345,17 +1353,24 @@ def simple_upsert_many_txn_emulated( | |
value_names: The value column names | ||
value_values: A list of each row's value column values. | ||
Ignored if value_names is empty. | ||
lock: True to lock the table when doing the upsert. | ||
""" | ||
# No value columns, therefore make a blank list so that the following | ||
# zip() works correctly. | ||
if not value_names: | ||
value_values = [() for x in range(len(key_values))] | ||
|
||
if lock: | ||
# Lock the table just once, to prevent it being done once per row. | ||
# Note that, according to Postgres' documentation, once obtained, | ||
# the lock is held for the remainder of the current transaction. | ||
self.engine.lock_table(txn, "user_ips") | ||
Comment on lines
+1364
to
+1367
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this code is used on postgres though since that doesn't need the emulated support here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, sigh, you're absolutely right. Locking on SQLite is a no-op (apparently! I haven't fully reasoned through how that's sufficient, but the code is a no-op...?!).
Why do upserts have locking at all then? Maybe I made the mistake of assuming that there must have been a reason for it because of the code that was here before... In one of the cases, an emulated upsert can be a The latter case (UPDATE, INSERT) doesn't sound like it would need a lock, because both of those operations need a PENDING lock. There's no way another writer can get in the middle of those two. (Assuming traditional SQLite3 locking) The first case (SELECT, INSERT) may be problematic because I can't tell whether this works properly in WAL mode or not; the WAL docs aren't particularly clear about how transactions interact like the old-style locking page is. That said, the page on transactions has this to say:
Assuming that's still true for WAL mode, then I think it's safe on SQLite. Ah! However, there's one case where Postgres DOES need the emulated support, so I need to take back some of the things I just wrote :/. That turned into something more complex than I thought :/ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I tried this and it seems safe on both WAL and non-WAL mode. The advantage of WAL mode in this case is that another transaction merely running 'select' and hanging around doesn't prevent you from committing. But that transaction will have to restart (at least if it's in conflict; not sure about generally) — it gets told the database is locked each time it tries to commit or update. I learnt a bit about SQLite's WAL mode, so that's good at least |
||
|
||
for keyv, valv in zip(key_values, value_values): | ||
_keys = {x: y for x, y in zip(key_names, keyv)} | ||
_vals = {x: y for x, y in zip(value_names, valv)} | ||
|
||
self.simple_upsert_txn_emulated(txn, table, _keys, _vals) | ||
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False) | ||
|
||
def simple_upsert_many_txn_native_upsert( | ||
self, | ||
|
@@ -1792,6 +1807,86 @@ def simple_update_txn( | |
|
||
return txn.rowcount | ||
|
||
async def simple_update_many( | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self, | ||
table: str, | ||
key_names: Collection[str], | ||
key_values: Collection[Iterable[Any]], | ||
value_names: Collection[str], | ||
value_values: Iterable[Iterable[Any]], | ||
desc: str, | ||
) -> None: | ||
""" | ||
Update, many times, using batching where possible. | ||
If the keys don't match anything, nothing will be updated. | ||
|
||
Args: | ||
table: The table to update | ||
key_names: The key column names. | ||
key_values: A list of each row's key column values. | ||
value_names: The names of value columns to update. | ||
value_values: A list of each row's value column values. | ||
""" | ||
|
||
await self.runInteraction( | ||
desc, | ||
self.simple_update_many_txn, | ||
table, | ||
key_names, | ||
key_values, | ||
value_names, | ||
value_values, | ||
) | ||
|
||
@staticmethod | ||
def simple_update_many_txn( | ||
txn: LoggingTransaction, | ||
table: str, | ||
key_names: Collection[str], | ||
key_values: Collection[Iterable[Any]], | ||
value_names: Collection[str], | ||
value_values: Collection[Iterable[Any]], | ||
) -> None: | ||
""" | ||
Update, many times, using batching where possible. | ||
reivilibre marked this conversation as resolved.
Show resolved
Hide resolved
|
||
If the keys don't match anything, nothing will be updated. | ||
|
||
Args: | ||
table: The table to update | ||
key_names: The key column names. | ||
key_values: A list of each row's key column values. | ||
value_names: The names of value columns to update. | ||
value_values: A list of each row's value column values. | ||
""" | ||
|
||
if len(value_values) != len(key_values): | ||
raise ValueError( | ||
f"{len(key_values)} key rows and {len(value_values)} value rows: should be the same number." | ||
) | ||
|
||
# List of tuples of (value values, then key values) | ||
# (This matches the order needed for the query) | ||
args = [tuple(x) + tuple(y) for x, y in zip(value_values, key_values)] | ||
|
||
for ks, vs in zip(key_values, value_values): | ||
args.append(tuple(vs) + tuple(ks)) | ||
|
||
# 'col1 = ?, col2 = ?, ...' | ||
set_clause = ", ".join(f"{n} = ?" for n in value_names) | ||
|
||
if key_names: | ||
# 'WHERE col3 = ? AND col4 = ? AND col5 = ?' | ||
where_clause = "WHERE " + (" AND ".join(f"{n} = ?" for n in key_names)) | ||
else: | ||
where_clause = "" | ||
|
||
# UPDATE mytable SET col1 = ?, col2 = ? WHERE col3 = ? AND col4 = ? | ||
sql = f""" | ||
UPDATE {table} SET {set_clause} {where_clause} | ||
""" | ||
|
||
txn.execute_batch(sql, args) | ||
|
||
async def simple_update_one( | ||
self, | ||
table: str, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I think this is fine to default to
True
sincesimple_upsert_txn_emulated
(which this eventually calls) defaults toTrue
. So pretty much this was the default behavior.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, exactly — this is just providing a way to opt out of it.