From ec0f138a4c8b78b4968d7345750072f78a0e59de Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Mon, 24 Jun 2024 21:04:11 +0000 Subject: [PATCH 1/3] feat: add support for lazy refresh strategy --- README.md | 20 ++- google/cloud/alloydb/connector/__init__.py | 9 +- .../alloydb/connector/async_connector.py | 21 ++- google/cloud/alloydb/connector/connector.py | 21 ++- google/cloud/alloydb/connector/enums.py | 18 +++ google/cloud/alloydb/connector/lazy.py | 127 ++++++++++++++++++ tests/system/test_asyncpg_iam_authn.py | 34 +++-- tests/system/test_pg8000_iam_authn.py | 33 +++-- tests/unit/test_lazy.py | 62 +++++++++ 9 files changed, 319 insertions(+), 26 deletions(-) create mode 100644 google/cloud/alloydb/connector/lazy.py create mode 100644 tests/unit/test_lazy.py diff --git a/README.md b/README.md index 0a0dd1db..6b6bb800 100644 --- a/README.md +++ b/README.md @@ -296,7 +296,6 @@ async def main(): For more details on additional arguments with an `asyncpg.Connection`, please visit the [official documentation][asyncpg-docs]. - [asyncpg-docs]: https://magicstack.github.io/asyncpg/current/api/index.html ### Async Context Manager @@ -382,15 +381,30 @@ connector.connect( [configure-iam-authn]: https://cloud.google.com/alloydb/docs/manage-iam-authn#enable [add-iam-user]: https://cloud.google.com/alloydb/docs/manage-iam-authn#create-user +### Configuring a Lazy Refresh (Cloud Run, Cloud Functions etc.) + +The Connector's `refresh_strategy` argument can be set to `"lazy"` to configure +the Python Connector to retrieve connection info lazily and as-needed. +Otherwise, a background refresh cycle runs to retrive the connection info +periodically. This setting is useful in environments where the CPU may be +throttled outside of a request context, e.g., Cloud Run, Cloud Functions, etc. + +To set the refresh strategy, set the `refresh_strategy` keyword argument when +initializing a `Connector`: + +```python +connector = Connector(refresh_strategy="lazy") +``` + ### Specifying IP Address Type The AlloyDB Python Connector by default will attempt to establish connections to your instance's private IP. To change this, such as connecting to AlloyDB over a public IP address or Private Service Connect (PSC), set the `ip_type` -keyword argument when initializing a `Connector()` or when calling +keyword argument when initializing a `Connector()` or when calling `connector.connect()`. -Possible values for `ip_type` are `"PRIVATE"` (default value), `"PUBLIC"`, +Possible values for `ip_type` are `"PRIVATE"` (default value), `"PUBLIC"`, and `"PSC"`. Example: diff --git a/google/cloud/alloydb/connector/__init__.py b/google/cloud/alloydb/connector/__init__.py index bd011cbb..f0a4d7b9 100644 --- a/google/cloud/alloydb/connector/__init__.py +++ b/google/cloud/alloydb/connector/__init__.py @@ -14,6 +14,13 @@ from google.cloud.alloydb.connector.async_connector import AsyncConnector from google.cloud.alloydb.connector.connector import Connector from google.cloud.alloydb.connector.enums import IPTypes +from google.cloud.alloydb.connector.enums import RefreshStrategy from google.cloud.alloydb.connector.version import __version__ -__all__ = ["__version__", "Connector", "AsyncConnector", "IPTypes"] +__all__ = [ + "__version__", + "Connector", + "AsyncConnector", + "IPTypes", + "RefreshStrategy", +] diff --git a/google/cloud/alloydb/connector/async_connector.py b/google/cloud/alloydb/connector/async_connector.py index 682dee8d..349ad8ec 100644 --- a/google/cloud/alloydb/connector/async_connector.py +++ b/google/cloud/alloydb/connector/async_connector.py @@ -16,7 +16,7 @@ import asyncio from types import TracebackType -from typing import Any, Dict, Optional, Type, TYPE_CHECKING +from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union import google.auth from google.auth.credentials import with_scopes_if_required @@ -25,7 +25,9 @@ import google.cloud.alloydb.connector.asyncpg as asyncpg from google.cloud.alloydb.connector.client import AlloyDBClient from google.cloud.alloydb.connector.enums import IPTypes +from google.cloud.alloydb.connector.enums import RefreshStrategy from google.cloud.alloydb.connector.instance import RefreshAheadCache +from google.cloud.alloydb.connector.lazy import LazyRefreshCache from google.cloud.alloydb.connector.utils import generate_keys if TYPE_CHECKING: @@ -49,6 +51,11 @@ class AsyncConnector: enable_iam_auth (bool): Enables automatic IAM database authentication. ip_type (str | IPTypes): Default IP type for all AlloyDB connections. Defaults to IPTypes.PRIVATE ("PRIVATE") for private IP connections. + refresh_strategy (str | RefreshStrategy): The default refresh strategy + used to refresh SSL/TLS cert and instance metadata. Can be one + of the following: RefreshStrategy.LAZY ("LAZY") or + RefreshStrategy.BACKGROUND ("BACKGROUND"). + Default: RefreshStrategy.BACKGROUND """ def __init__( @@ -59,8 +66,9 @@ def __init__( enable_iam_auth: bool = False, ip_type: str | IPTypes = IPTypes.PRIVATE, user_agent: Optional[str] = None, + refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND, ) -> None: - self._cache: Dict[str, RefreshAheadCache] = {} + self._cache: Dict[str, Union[RefreshAheadCache, LazyRefreshCache]] = {} # initialize default params self._quota_project = quota_project self._alloydb_api_endpoint = alloydb_api_endpoint @@ -69,6 +77,10 @@ def __init__( if isinstance(ip_type, str): ip_type = IPTypes(ip_type.upper()) self._ip_type = ip_type + # if refresh_strategy is str, convert to RefreshStrategy enum + if isinstance(refresh_strategy, str): + refresh_strategy = RefreshStrategy(refresh_strategy.upper()) + self._refresh_strategy = refresh_strategy self._user_agent = user_agent # initialize credentials scopes = ["https://www.googleapis.com/auth/cloud-platform"] @@ -128,7 +140,10 @@ async def connect( if instance_uri in self._cache: cache = self._cache[instance_uri] else: - cache = RefreshAheadCache(instance_uri, self._client, self._keys) + if self._refresh_strategy == RefreshStrategy.LAZY: + cache = LazyRefreshCache(instance_uri, self._client, self._keys) + else: + cache = RefreshAheadCache(instance_uri, self._client, self._keys) self._cache[instance_uri] = cache connect_func = { diff --git a/google/cloud/alloydb/connector/connector.py b/google/cloud/alloydb/connector/connector.py index 271842b1..1cd20b1d 100644 --- a/google/cloud/alloydb/connector/connector.py +++ b/google/cloud/alloydb/connector/connector.py @@ -20,14 +20,16 @@ import struct from threading import Thread from types import TracebackType -from typing import Any, Dict, Optional, Type, TYPE_CHECKING +from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union from google.auth import default from google.auth.credentials import with_scopes_if_required from google.cloud.alloydb.connector.client import AlloyDBClient from google.cloud.alloydb.connector.enums import IPTypes +from google.cloud.alloydb.connector.enums import RefreshStrategy from google.cloud.alloydb.connector.instance import RefreshAheadCache +from google.cloud.alloydb.connector.lazy import LazyRefreshCache import google.cloud.alloydb.connector.pg8000 as pg8000 from google.cloud.alloydb.connector.utils import generate_keys import google.cloud.alloydb_connectors_v1.proto.resources_pb2 as connectorspb @@ -59,6 +61,11 @@ class Connector: enable_iam_auth (bool): Enables automatic IAM database authentication. ip_type (str | IPTypes): Default IP type for all AlloyDB connections. Defaults to IPTypes.PRIVATE ("PRIVATE") for private IP connections. + refresh_strategy (str | RefreshStrategy): The default refresh strategy + used to refresh SSL/TLS cert and instance metadata. Can be one + of the following: RefreshStrategy.LAZY ("LAZY") or + RefreshStrategy.BACKGROUND ("BACKGROUND"). + Default: RefreshStrategy.BACKGROUND """ def __init__( @@ -69,12 +76,13 @@ def __init__( enable_iam_auth: bool = False, ip_type: str | IPTypes = IPTypes.PRIVATE, user_agent: Optional[str] = None, + refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND, ) -> None: # create event loop and start it in background thread self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() self._thread = Thread(target=self._loop.run_forever, daemon=True) self._thread.start() - self._cache: Dict[str, RefreshAheadCache] = {} + self._cache: Dict[str, Union[RefreshAheadCache, LazyRefreshCache]] = {} # initialize default params self._quota_project = quota_project self._alloydb_api_endpoint = alloydb_api_endpoint @@ -83,6 +91,10 @@ def __init__( if isinstance(ip_type, str): ip_type = IPTypes(ip_type.upper()) self._ip_type = ip_type + # if refresh_strategy is str, convert to RefreshStrategy enum + if isinstance(refresh_strategy, str): + refresh_strategy = RefreshStrategy(refresh_strategy.upper()) + self._refresh_strategy = refresh_strategy self._user_agent = user_agent # initialize credentials scopes = ["https://www.googleapis.com/auth/cloud-platform"] @@ -155,7 +167,10 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) -> if instance_uri in self._cache: cache = self._cache[instance_uri] else: - cache = RefreshAheadCache(instance_uri, self._client, self._keys) + if self._refresh_strategy == RefreshStrategy.LAZY: + cache = LazyRefreshCache(instance_uri, self._client, self._keys) + else: + cache = RefreshAheadCache(instance_uri, self._client, self._keys) self._cache[instance_uri] = cache connect_func = { diff --git a/google/cloud/alloydb/connector/enums.py b/google/cloud/alloydb/connector/enums.py index 3c625be3..eab5a63a 100644 --- a/google/cloud/alloydb/connector/enums.py +++ b/google/cloud/alloydb/connector/enums.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations + from enum import Enum @@ -30,3 +32,19 @@ def _missing_(cls, value: object) -> None: f"Incorrect value for ip_type, got '{value}'. Want one of: " f"{', '.join([repr(m.value) for m in cls])}." ) + + +class RefreshStrategy(Enum): + """ + Enum for specifying refresh strategy to connect to AlloyDB with. + """ + + LAZY: str = "LAZY" + BACKGROUND: str = "BACKGROUND" + + @classmethod + def _missing_(cls, value: object) -> None: + raise ValueError( + f"Incorrect value for refresh_strategy, got '{value}'. Want one of: " + f"{', '.join([repr(m.value) for m in cls])}." + ) diff --git a/google/cloud/alloydb/connector/lazy.py b/google/cloud/alloydb/connector/lazy.py new file mode 100644 index 00000000..321fcec8 --- /dev/null +++ b/google/cloud/alloydb/connector/lazy.py @@ -0,0 +1,127 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +from datetime import datetime +from datetime import timedelta +from datetime import timezone +import logging +from typing import Optional + +from google.cloud.alloydb.connector.client import AlloyDBClient +from google.cloud.alloydb.connector.connection_info import ConnectionInfo +from google.cloud.alloydb.connector.instance import _parse_instance_uri +from google.cloud.alloydb.connector.refresh_utils import _refresh_buffer + +logger = logging.getLogger(name=__name__) + + +class LazyRefreshCache: + """Cache that refreshes connection info when a caller requests a connection. + + Only refreshes the cache when a new connection is requested and the current + certificate is close to or already expired. + + This is the recommended option for serverless environments. + """ + + def __init__( + self, + instance_uri: str, + client: AlloyDBClient, + keys: asyncio.Future, + ) -> None: + """Initializes a LazyRefreshCache instance. + + Args: + instance_connection_string (str): The AlloyDB Instance's + connection URI. + client (AlloyDBClient): The AlloyDB client instance. + keys (asyncio.Future): A future to the client's public-private key + pair. + """ + # validate and parse instance connection name + self._project, self._region, self._cluster, self._name = _parse_instance_uri( + instance_uri + ) + self._instance_uri = instance_uri + + self._keys = keys + self._client = client + self._lock = asyncio.Lock() + self._cached: Optional[ConnectionInfo] = None + self._needs_refresh = False + + async def force_refresh(self) -> None: + """ + Invalidates the cache and configures the next call to + connect_info() to retrieve a fresh ConnectionInfo instance. + """ + async with self._lock: + self._needs_refresh = True + + async def connect_info(self) -> ConnectionInfo: + """Retrieves ConnectionInfo instance for establishing a secure + connection to the AlloyDB instance. + """ + async with self._lock: + # If connection info is cached, check expiration. + # Pad expiration with a buffer to give the client plenty of time to + # establish a connection to the server with the certificate. + if ( + self._cached + and not self._needs_refresh + and datetime.now(timezone.utc) + < (self._cached.expiration - timedelta(seconds=_refresh_buffer)) + ): + logger.debug( + f"['{self._instance_uri}']: Connection info " + "is still valid, using cached info" + ) + return self._cached + logger.debug( + f"['{self._instance_uri}']: Connection info " + "refresh operation started" + ) + try: + conn_info = await self._client.get_connection_info( + self._project, + self._region, + self._cluster, + self._name, + self._keys, + ) + except Exception as e: + logger.debug( + f"['{self._instance_uri}']: Connection info " + f"refresh operation failed: {str(e)}" + ) + raise + logger.debug( + f"['{self._instance_uri}']: Connection info " + "refresh operation completed successfully" + ) + logger.debug( + f"['{self._instance_uri}']: Current certificate " + f"expiration = {str(conn_info.expiration)}" + ) + self._cached = conn_info + self._needs_refresh = False + return conn_info + + async def close(self) -> None: + """Close is a no-op and provided purely for a consistent interface with + other cache types. + """ + pass diff --git a/tests/system/test_asyncpg_iam_authn.py b/tests/system/test_asyncpg_iam_authn.py index 3733348e..161f4a98 100644 --- a/tests/system/test_asyncpg_iam_authn.py +++ b/tests/system/test_asyncpg_iam_authn.py @@ -25,9 +25,7 @@ async def create_sqlalchemy_engine( - inst_uri: str, - user: str, - db: str, + inst_uri: str, user: str, db: str, refresh_strategy: str = "background" ) -> Tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, AsyncConnector]: """Creates a connection pool for an AlloyDB instance and returns the pool and the connector. Callers are responsible for closing the pool and the @@ -36,9 +34,9 @@ async def create_sqlalchemy_engine( A sample invocation looks like: pool, connector = await create_sqlalchemy_engine( - inst_uri, - user, - db, + inst_uri, + user, + db, ) async with pool.connect() as conn: time = (await conn.execute(sqlalchemy.text("SELECT NOW()"))).fetchone() @@ -55,10 +53,14 @@ async def create_sqlalchemy_engine( user (str): The formatted IAM database username. e.g., my-email@test.com, service-account@project-id.iam - db_name (str): + db (str): The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the AlloyDB Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. """ - connector = AsyncConnector() + connector = AsyncConnector(refresh_strategy=refresh_strategy) async def getconn() -> asyncpg.Connection: conn: asyncpg.Connection = await connector.connect( @@ -96,3 +98,19 @@ async def test_asyncpg_iam_authn_time() -> None: await connector.close() # cleanup AsyncEngine await pool.dispose() + + +async def test_asyncpg_iam_authn_lazy() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_IAM_USER"] + db = os.environ["ALLOYDB_DB"] + + pool, connector = await create_sqlalchemy_engine(inst_uri, user, db, "lazy") + async with pool.connect() as conn: + time = (await conn.execute(sqlalchemy.text("SELECT NOW()"))).fetchone() + curr_time = time[0] + assert type(curr_time) is datetime + await connector.close() + # cleanup AsyncEngine + await pool.dispose() diff --git a/tests/system/test_pg8000_iam_authn.py b/tests/system/test_pg8000_iam_authn.py index ab600cd1..b845fe3a 100644 --- a/tests/system/test_pg8000_iam_authn.py +++ b/tests/system/test_pg8000_iam_authn.py @@ -24,9 +24,7 @@ def create_sqlalchemy_engine( - inst_uri: str, - user: str, - db: str, + inst_uri: str, user: str, db: str, refresh_strategy="background" ) -> Tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for an AlloyDB instance and returns the pool and the connector. Callers are responsible for closing the pool and the @@ -35,9 +33,9 @@ def create_sqlalchemy_engine( A sample invocation looks like: engine, connector = create_sqlalchemy_engine( - inst_uri, - user, - db, + inst_uri, + user, + db, ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() @@ -54,10 +52,14 @@ def create_sqlalchemy_engine( user (str): The formatted IAM database username. e.g., my-email@test.com, service-account@project-id.iam - db_name (str): + db (str): The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the AlloyDB Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. """ - connector = Connector() + connector = Connector(refresh_strategy=refresh_strategy) def getconn() -> pg8000.dbapi.Connection: conn: pg8000.dbapi.Connection = connector.connect( @@ -93,3 +95,18 @@ def test_pg8000_iam_authn_time() -> None: curr_time = time[0] assert type(curr_time) is datetime connector.close() + + +def test_pg8000_iam_authn_lazy() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_IAM_USER"] + db = os.environ["ALLOYDB_DB"] + + engine, connector = create_sqlalchemy_engine(inst_uri, user, db, "lazy") + with engine.connect() as conn: + time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() + conn.commit() + curr_time = time[0] + assert type(curr_time) is datetime + connector.close() diff --git a/tests/unit/test_lazy.py b/tests/unit/test_lazy.py new file mode 100644 index 00000000..00959349 --- /dev/null +++ b/tests/unit/test_lazy.py @@ -0,0 +1,62 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio + +from google.cloud.alloydb.connector.client import AlloyDBClient +from google.cloud.alloydb.connector.connection_info import ConnectionInfo +from google.cloud.alloydb.connector.lazy import LazyRefreshCache +from google.cloud.alloydb.connector.utils import generate_keys + + +async def test_LazyRefreshCache_connect_info(fake_client: AlloyDBClient) -> None: + """ + Test that LazyRefreshCache.connect_info works as expected. + """ + keys = asyncio.create_task(generate_keys()) + cache = LazyRefreshCache( + "projects/test-project/locations/test-region/clusters/test-cluster/instances/test-instance", + client=fake_client, + keys=keys, + ) + # check that cached connection info is empty + assert cache._cached is None + conn_info = await cache.connect_info() + # check that cached connection info is now set + assert isinstance(cache._cached, ConnectionInfo) + # check that calling connect_info uses cached info + conn_info2 = await cache.connect_info() + assert conn_info2 == conn_info + + +async def test_LazyRefreshCache_force_refresh(fake_client: AlloyDBClient) -> None: + """ + Test that LazyRefreshCache.force_refresh works as expected. + """ + keys = asyncio.create_task(generate_keys()) + cache = LazyRefreshCache( + "projects/test-project/locations/test-region/clusters/test-cluster/instances/test-instance", + client=fake_client, + keys=keys, + ) + conn_info = await cache.connect_info() + # check that cached connection info is now set + assert isinstance(cache._cached, ConnectionInfo) + await cache.force_refresh() + # check that calling connect_info after force_refresh gets new ConnectionInfo + conn_info2 = await cache.connect_info() + # check that new connection info was retrieved + assert conn_info2 != conn_info + assert cache._cached == conn_info2 + await cache.close() From 2b1d4570d1f9f643a09e48da7e7e5938d3c66df3 Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Mon, 24 Jun 2024 21:07:58 +0000 Subject: [PATCH 2/3] chore: lint --- tests/system/test_pg8000_iam_authn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_pg8000_iam_authn.py b/tests/system/test_pg8000_iam_authn.py index b845fe3a..a370c6ec 100644 --- a/tests/system/test_pg8000_iam_authn.py +++ b/tests/system/test_pg8000_iam_authn.py @@ -24,7 +24,7 @@ def create_sqlalchemy_engine( - inst_uri: str, user: str, db: str, refresh_strategy="background" + inst_uri: str, user: str, db: str, refresh_strategy: str = "background" ) -> Tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for an AlloyDB instance and returns the pool and the connector. Callers are responsible for closing the pool and the From 25d2b282491582f3b643a5061d436c0674deae3e Mon Sep 17 00:00:00 2001 From: jackwotherspoon Date: Mon, 24 Jun 2024 21:20:49 +0000 Subject: [PATCH 3/3] chore: add additional integration tests --- tests/system/test_asyncpg_connection.py | 35 ++++++++++++++++++++----- tests/system/test_pg8000_connection.py | 35 ++++++++++++++++++++----- 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/tests/system/test_asyncpg_connection.py b/tests/system/test_asyncpg_connection.py index 5bc913ec..756d15b8 100644 --- a/tests/system/test_asyncpg_connection.py +++ b/tests/system/test_asyncpg_connection.py @@ -28,6 +28,7 @@ async def create_sqlalchemy_engine( user: str, password: str, db: str, + refresh_strategy: str = "background", ) -> Tuple[sqlalchemy.ext.asyncio.engine.AsyncEngine, AsyncConnector]: """Creates a connection pool for an AlloyDB instance and returns the pool and the connector. Callers are responsible for closing the pool and the @@ -36,10 +37,10 @@ async def create_sqlalchemy_engine( A sample invocation looks like: engine, connector = await create_sqlalchemy_engine( - inst_uri, - user, - password, - db, + inst_uri, + user, + password, + db, ) async with engine.connect() as conn: time = await conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() @@ -56,10 +57,14 @@ async def create_sqlalchemy_engine( The database user name, e.g., postgres password (str): The database user's password, e.g., secret-password - db_name (str): + db (str): The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the AlloyDB Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. """ - connector = AsyncConnector() + connector = AsyncConnector(refresh_strategy=refresh_strategy) async def getconn() -> asyncpg.Connection: conn: asyncpg.Connection = await connector.connect( @@ -97,3 +102,21 @@ async def test_connection_with_asyncpg() -> None: assert res[0] == 1 await connector.close() + + +async def test_lazy_connection_with_asyncpg() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_USER"] + password = os.environ["ALLOYDB_PASS"] + db = os.environ["ALLOYDB_DB"] + + pool, connector = await create_sqlalchemy_engine( + inst_uri, user, password, db, "lazy" + ) + + async with pool.connect() as conn: + res = (await conn.execute(sqlalchemy.text("SELECT 1"))).fetchone() + assert res[0] == 1 + + await connector.close() diff --git a/tests/system/test_pg8000_connection.py b/tests/system/test_pg8000_connection.py index 0e6a6cda..ad2ac8d2 100644 --- a/tests/system/test_pg8000_connection.py +++ b/tests/system/test_pg8000_connection.py @@ -28,6 +28,7 @@ def create_sqlalchemy_engine( user: str, password: str, db: str, + refresh_strategy: str = "background", ) -> Tuple[sqlalchemy.engine.Engine, Connector]: """Creates a connection pool for an AlloyDB instance and returns the pool and the connector. Callers are responsible for closing the pool and the @@ -36,10 +37,10 @@ def create_sqlalchemy_engine( A sample invocation looks like: engine, connector = create_sqlalchemy_engine( - inst_uri, - user, - password, - db, + inst_uri, + user, + password, + db, ) with engine.connect() as conn: time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() @@ -57,10 +58,14 @@ def create_sqlalchemy_engine( The database user name, e.g., postgres password (str): The database user's password, e.g., secret-password - db_name (str): + db (str): The name of the database, e.g., mydb + refresh_strategy (Optional[str]): + Refresh strategy for the AlloyDB Connector. Can be one of "lazy" + or "background". For serverless environments use "lazy" to avoid + errors resulting from CPU being throttled. """ - connector = Connector() + connector = Connector(refresh_strategy=refresh_strategy) def getconn() -> pg8000.dbapi.Connection: conn: pg8000.dbapi.Connection = connector.connect( @@ -84,7 +89,7 @@ def getconn() -> pg8000.dbapi.Connection: # [END alloydb_sqlalchemy_connect_connector] -def test_pg8000_time() -> None: +def test_pg8000_connection() -> None: """Basic test to get time from database.""" inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] user = os.environ["ALLOYDB_USER"] @@ -98,3 +103,19 @@ def test_pg8000_time() -> None: curr_time = time[0] assert type(curr_time) is datetime connector.close() + + +def test_lazy_pg8000_connection() -> None: + """Basic test to get time from database.""" + inst_uri = os.environ["ALLOYDB_INSTANCE_URI"] + user = os.environ["ALLOYDB_USER"] + password = os.environ["ALLOYDB_PASS"] + db = os.environ["ALLOYDB_DB"] + + engine, connector = create_sqlalchemy_engine(inst_uri, user, password, db, "lazy") + with engine.connect() as conn: + time = conn.execute(sqlalchemy.text("SELECT NOW()")).fetchone() + conn.commit() + curr_time = time[0] + assert type(curr_time) is datetime + connector.close()