Skip to content

Commit

Permalink
feat: add support for lazy refresh strategy (#337)
Browse files Browse the repository at this point in the history
Add refresh_strategy argument to Connector() that allows setting
the strategy to "lazy" to use a lazy refresh strategy.

When creating a Connector via Connector(refresh_strategy="lazy"),
the connection info and ephemeral certificate will be refreshed only
when the cache certificate has expired. No background tasks run
periodically with this option, making it ideal for use in serverless
environments such as Cloud Run, Cloud Functions, etc, where the
CPU may be throttled.
  • Loading branch information
jackwotherspoon authored Jun 25, 2024
1 parent 2fa8df1 commit fbf0179
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 39 deletions.
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ async def main():
For more details on additional arguments with an `asyncpg.Connection`, please
visit the [official documentation][asyncpg-docs].


[asyncpg-docs]: https://magicstack.github.io/asyncpg/current/api/index.html

### Async Context Manager
Expand Down Expand Up @@ -382,15 +381,30 @@ connector.connect(
[configure-iam-authn]: https://cloud.google.com/alloydb/docs/manage-iam-authn#enable
[add-iam-user]: https://cloud.google.com/alloydb/docs/manage-iam-authn#create-user

### Configuring a Lazy Refresh (Cloud Run, Cloud Functions etc.)

The Connector's `refresh_strategy` argument can be set to `"lazy"` to configure
the Python Connector to retrieve connection info lazily and as-needed.
Otherwise, a background refresh cycle runs to retrive the connection info
periodically. This setting is useful in environments where the CPU may be
throttled outside of a request context, e.g., Cloud Run, Cloud Functions, etc.

To set the refresh strategy, set the `refresh_strategy` keyword argument when
initializing a `Connector`:

```python
connector = Connector(refresh_strategy="lazy")
```

### Specifying IP Address Type

The AlloyDB Python Connector by default will attempt to establish connections
to your instance's private IP. To change this, such as connecting to AlloyDB
over a public IP address or Private Service Connect (PSC), set the `ip_type`
keyword argument when initializing a `Connector()` or when calling
keyword argument when initializing a `Connector()` or when calling
`connector.connect()`.

Possible values for `ip_type` are `"PRIVATE"` (default value), `"PUBLIC"`,
Possible values for `ip_type` are `"PRIVATE"` (default value), `"PUBLIC"`,
and `"PSC"`.

Example:
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/alloydb/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from google.cloud.alloydb.connector.async_connector import AsyncConnector
from google.cloud.alloydb.connector.connector import Connector
from google.cloud.alloydb.connector.enums import IPTypes
from google.cloud.alloydb.connector.enums import RefreshStrategy
from google.cloud.alloydb.connector.version import __version__

__all__ = ["__version__", "Connector", "AsyncConnector", "IPTypes"]
__all__ = [
"__version__",
"Connector",
"AsyncConnector",
"IPTypes",
"RefreshStrategy",
]
21 changes: 18 additions & 3 deletions google/cloud/alloydb/connector/async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import asyncio
from types import TracebackType
from typing import Any, Dict, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union

import google.auth
from google.auth.credentials import with_scopes_if_required
Expand All @@ -25,7 +25,9 @@
import google.cloud.alloydb.connector.asyncpg as asyncpg
from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.enums import IPTypes
from google.cloud.alloydb.connector.enums import RefreshStrategy
from google.cloud.alloydb.connector.instance import RefreshAheadCache
from google.cloud.alloydb.connector.lazy import LazyRefreshCache
from google.cloud.alloydb.connector.utils import generate_keys

if TYPE_CHECKING:
Expand All @@ -49,6 +51,11 @@ class AsyncConnector:
enable_iam_auth (bool): Enables automatic IAM database authentication.
ip_type (str | IPTypes): Default IP type for all AlloyDB connections.
Defaults to IPTypes.PRIVATE ("PRIVATE") for private IP connections.
refresh_strategy (str | RefreshStrategy): The default refresh strategy
used to refresh SSL/TLS cert and instance metadata. Can be one
of the following: RefreshStrategy.LAZY ("LAZY") or
RefreshStrategy.BACKGROUND ("BACKGROUND").
Default: RefreshStrategy.BACKGROUND
"""

def __init__(
Expand All @@ -59,8 +66,9 @@ def __init__(
enable_iam_auth: bool = False,
ip_type: str | IPTypes = IPTypes.PRIVATE,
user_agent: Optional[str] = None,
refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND,
) -> None:
self._cache: Dict[str, RefreshAheadCache] = {}
self._cache: Dict[str, Union[RefreshAheadCache, LazyRefreshCache]] = {}
# initialize default params
self._quota_project = quota_project
self._alloydb_api_endpoint = alloydb_api_endpoint
Expand All @@ -69,6 +77,10 @@ def __init__(
if isinstance(ip_type, str):
ip_type = IPTypes(ip_type.upper())
self._ip_type = ip_type
# if refresh_strategy is str, convert to RefreshStrategy enum
if isinstance(refresh_strategy, str):
refresh_strategy = RefreshStrategy(refresh_strategy.upper())
self._refresh_strategy = refresh_strategy
self._user_agent = user_agent
# initialize credentials
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
Expand Down Expand Up @@ -128,7 +140,10 @@ async def connect(
if instance_uri in self._cache:
cache = self._cache[instance_uri]
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
if self._refresh_strategy == RefreshStrategy.LAZY:
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache

connect_func = {
Expand Down
21 changes: 18 additions & 3 deletions google/cloud/alloydb/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import struct
from threading import Thread
from types import TracebackType
from typing import Any, Dict, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union

from google.auth import default
from google.auth.credentials import with_scopes_if_required

from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.enums import IPTypes
from google.cloud.alloydb.connector.enums import RefreshStrategy
from google.cloud.alloydb.connector.instance import RefreshAheadCache
from google.cloud.alloydb.connector.lazy import LazyRefreshCache
import google.cloud.alloydb.connector.pg8000 as pg8000
from google.cloud.alloydb.connector.utils import generate_keys
import google.cloud.alloydb_connectors_v1.proto.resources_pb2 as connectorspb
Expand Down Expand Up @@ -59,6 +61,11 @@ class Connector:
enable_iam_auth (bool): Enables automatic IAM database authentication.
ip_type (str | IPTypes): Default IP type for all AlloyDB connections.
Defaults to IPTypes.PRIVATE ("PRIVATE") for private IP connections.
refresh_strategy (str | RefreshStrategy): The default refresh strategy
used to refresh SSL/TLS cert and instance metadata. Can be one
of the following: RefreshStrategy.LAZY ("LAZY") or
RefreshStrategy.BACKGROUND ("BACKGROUND").
Default: RefreshStrategy.BACKGROUND
"""

def __init__(
Expand All @@ -69,12 +76,13 @@ def __init__(
enable_iam_auth: bool = False,
ip_type: str | IPTypes = IPTypes.PRIVATE,
user_agent: Optional[str] = None,
refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND,
) -> None:
# create event loop and start it in background thread
self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever, daemon=True)
self._thread.start()
self._cache: Dict[str, RefreshAheadCache] = {}
self._cache: Dict[str, Union[RefreshAheadCache, LazyRefreshCache]] = {}
# initialize default params
self._quota_project = quota_project
self._alloydb_api_endpoint = alloydb_api_endpoint
Expand All @@ -83,6 +91,10 @@ def __init__(
if isinstance(ip_type, str):
ip_type = IPTypes(ip_type.upper())
self._ip_type = ip_type
# if refresh_strategy is str, convert to RefreshStrategy enum
if isinstance(refresh_strategy, str):
refresh_strategy = RefreshStrategy(refresh_strategy.upper())
self._refresh_strategy = refresh_strategy
self._user_agent = user_agent
# initialize credentials
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
Expand Down Expand Up @@ -155,7 +167,10 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
if instance_uri in self._cache:
cache = self._cache[instance_uri]
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
if self._refresh_strategy == RefreshStrategy.LAZY:
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache

connect_func = {
Expand Down
18 changes: 18 additions & 0 deletions google/cloud/alloydb/connector/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from enum import Enum


Expand All @@ -30,3 +32,19 @@ def _missing_(cls, value: object) -> None:
f"Incorrect value for ip_type, got '{value}'. Want one of: "
f"{', '.join([repr(m.value) for m in cls])}."
)


class RefreshStrategy(Enum):
"""
Enum for specifying refresh strategy to connect to AlloyDB with.
"""

LAZY: str = "LAZY"
BACKGROUND: str = "BACKGROUND"

@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(
f"Incorrect value for refresh_strategy, got '{value}'. Want one of: "
f"{', '.join([repr(m.value) for m in cls])}."
)
127 changes: 127 additions & 0 deletions google/cloud/alloydb/connector/lazy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import logging
from typing import Optional

from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.connection_info import ConnectionInfo
from google.cloud.alloydb.connector.instance import _parse_instance_uri
from google.cloud.alloydb.connector.refresh_utils import _refresh_buffer

logger = logging.getLogger(name=__name__)


class LazyRefreshCache:
"""Cache that refreshes connection info when a caller requests a connection.
Only refreshes the cache when a new connection is requested and the current
certificate is close to or already expired.
This is the recommended option for serverless environments.
"""

def __init__(
self,
instance_uri: str,
client: AlloyDBClient,
keys: asyncio.Future,
) -> None:
"""Initializes a LazyRefreshCache instance.
Args:
instance_connection_string (str): The AlloyDB Instance's
connection URI.
client (AlloyDBClient): The AlloyDB client instance.
keys (asyncio.Future): A future to the client's public-private key
pair.
"""
# validate and parse instance connection name
self._project, self._region, self._cluster, self._name = _parse_instance_uri(
instance_uri
)
self._instance_uri = instance_uri

self._keys = keys
self._client = client
self._lock = asyncio.Lock()
self._cached: Optional[ConnectionInfo] = None
self._needs_refresh = False

async def force_refresh(self) -> None:
"""
Invalidates the cache and configures the next call to
connect_info() to retrieve a fresh ConnectionInfo instance.
"""
async with self._lock:
self._needs_refresh = True

async def connect_info(self) -> ConnectionInfo:
"""Retrieves ConnectionInfo instance for establishing a secure
connection to the AlloyDB instance.
"""
async with self._lock:
# If connection info is cached, check expiration.
# Pad expiration with a buffer to give the client plenty of time to
# establish a connection to the server with the certificate.
if (
self._cached
and not self._needs_refresh
and datetime.now(timezone.utc)
< (self._cached.expiration - timedelta(seconds=_refresh_buffer))
):
logger.debug(
f"['{self._instance_uri}']: Connection info "
"is still valid, using cached info"
)
return self._cached
logger.debug(
f"['{self._instance_uri}']: Connection info "
"refresh operation started"
)
try:
conn_info = await self._client.get_connection_info(
self._project,
self._region,
self._cluster,
self._name,
self._keys,
)
except Exception as e:
logger.debug(
f"['{self._instance_uri}']: Connection info "
f"refresh operation failed: {str(e)}"
)
raise
logger.debug(
f"['{self._instance_uri}']: Connection info "
"refresh operation completed successfully"
)
logger.debug(
f"['{self._instance_uri}']: Current certificate "
f"expiration = {str(conn_info.expiration)}"
)
self._cached = conn_info
self._needs_refresh = False
return conn_info

async def close(self) -> None:
"""Close is a no-op and provided purely for a consistent interface with
other cache types.
"""
pass
Loading

0 comments on commit fbf0179

Please sign in to comment.