Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for lazy refresh strategy #337

Merged
merged 3 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ async def main():
For more details on additional arguments with an `asyncpg.Connection`, please
visit the [official documentation][asyncpg-docs].


[asyncpg-docs]: https://magicstack.github.io/asyncpg/current/api/index.html

### Async Context Manager
Expand Down Expand Up @@ -382,15 +381,30 @@ connector.connect(
[configure-iam-authn]: https://cloud.google.com/alloydb/docs/manage-iam-authn#enable
[add-iam-user]: https://cloud.google.com/alloydb/docs/manage-iam-authn#create-user

### Configuring a Lazy Refresh (Cloud Run, Cloud Functions etc.)

The Connector's `refresh_strategy` argument can be set to `"lazy"` to configure
the Python Connector to retrieve connection info lazily and as-needed.
Otherwise, a background refresh cycle runs to retrive the connection info
periodically. This setting is useful in environments where the CPU may be
throttled outside of a request context, e.g., Cloud Run, Cloud Functions, etc.

To set the refresh strategy, set the `refresh_strategy` keyword argument when
initializing a `Connector`:

```python
connector = Connector(refresh_strategy="lazy")
```

### Specifying IP Address Type

The AlloyDB Python Connector by default will attempt to establish connections
to your instance's private IP. To change this, such as connecting to AlloyDB
over a public IP address or Private Service Connect (PSC), set the `ip_type`
keyword argument when initializing a `Connector()` or when calling
keyword argument when initializing a `Connector()` or when calling
`connector.connect()`.

Possible values for `ip_type` are `"PRIVATE"` (default value), `"PUBLIC"`,
Possible values for `ip_type` are `"PRIVATE"` (default value), `"PUBLIC"`,
and `"PSC"`.

Example:
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/alloydb/connector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
from google.cloud.alloydb.connector.async_connector import AsyncConnector
from google.cloud.alloydb.connector.connector import Connector
from google.cloud.alloydb.connector.enums import IPTypes
from google.cloud.alloydb.connector.enums import RefreshStrategy
from google.cloud.alloydb.connector.version import __version__

__all__ = ["__version__", "Connector", "AsyncConnector", "IPTypes"]
__all__ = [
"__version__",
"Connector",
"AsyncConnector",
"IPTypes",
"RefreshStrategy",
]
21 changes: 18 additions & 3 deletions google/cloud/alloydb/connector/async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import asyncio
from types import TracebackType
from typing import Any, Dict, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union

import google.auth
from google.auth.credentials import with_scopes_if_required
Expand All @@ -25,7 +25,9 @@
import google.cloud.alloydb.connector.asyncpg as asyncpg
from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.enums import IPTypes
from google.cloud.alloydb.connector.enums import RefreshStrategy
from google.cloud.alloydb.connector.instance import RefreshAheadCache
from google.cloud.alloydb.connector.lazy import LazyRefreshCache
from google.cloud.alloydb.connector.utils import generate_keys

if TYPE_CHECKING:
Expand All @@ -49,6 +51,11 @@ class AsyncConnector:
enable_iam_auth (bool): Enables automatic IAM database authentication.
ip_type (str | IPTypes): Default IP type for all AlloyDB connections.
Defaults to IPTypes.PRIVATE ("PRIVATE") for private IP connections.
refresh_strategy (str | RefreshStrategy): The default refresh strategy
used to refresh SSL/TLS cert and instance metadata. Can be one
of the following: RefreshStrategy.LAZY ("LAZY") or
RefreshStrategy.BACKGROUND ("BACKGROUND").
Default: RefreshStrategy.BACKGROUND
"""

def __init__(
Expand All @@ -59,8 +66,9 @@ def __init__(
enable_iam_auth: bool = False,
ip_type: str | IPTypes = IPTypes.PRIVATE,
user_agent: Optional[str] = None,
refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND,
) -> None:
self._cache: Dict[str, RefreshAheadCache] = {}
self._cache: Dict[str, Union[RefreshAheadCache, LazyRefreshCache]] = {}
# initialize default params
self._quota_project = quota_project
self._alloydb_api_endpoint = alloydb_api_endpoint
Expand All @@ -69,6 +77,10 @@ def __init__(
if isinstance(ip_type, str):
ip_type = IPTypes(ip_type.upper())
self._ip_type = ip_type
# if refresh_strategy is str, convert to RefreshStrategy enum
if isinstance(refresh_strategy, str):
refresh_strategy = RefreshStrategy(refresh_strategy.upper())
self._refresh_strategy = refresh_strategy
self._user_agent = user_agent
# initialize credentials
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
Expand Down Expand Up @@ -128,7 +140,10 @@ async def connect(
if instance_uri in self._cache:
cache = self._cache[instance_uri]
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
if self._refresh_strategy == RefreshStrategy.LAZY:
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache

connect_func = {
Expand Down
21 changes: 18 additions & 3 deletions google/cloud/alloydb/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import struct
from threading import Thread
from types import TracebackType
from typing import Any, Dict, Optional, Type, TYPE_CHECKING
from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union

from google.auth import default
from google.auth.credentials import with_scopes_if_required

from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.enums import IPTypes
from google.cloud.alloydb.connector.enums import RefreshStrategy
from google.cloud.alloydb.connector.instance import RefreshAheadCache
from google.cloud.alloydb.connector.lazy import LazyRefreshCache
import google.cloud.alloydb.connector.pg8000 as pg8000
from google.cloud.alloydb.connector.utils import generate_keys
import google.cloud.alloydb_connectors_v1.proto.resources_pb2 as connectorspb
Expand Down Expand Up @@ -59,6 +61,11 @@ class Connector:
enable_iam_auth (bool): Enables automatic IAM database authentication.
ip_type (str | IPTypes): Default IP type for all AlloyDB connections.
Defaults to IPTypes.PRIVATE ("PRIVATE") for private IP connections.
refresh_strategy (str | RefreshStrategy): The default refresh strategy
used to refresh SSL/TLS cert and instance metadata. Can be one
of the following: RefreshStrategy.LAZY ("LAZY") or
RefreshStrategy.BACKGROUND ("BACKGROUND").
Default: RefreshStrategy.BACKGROUND
"""

def __init__(
Expand All @@ -69,12 +76,13 @@ def __init__(
enable_iam_auth: bool = False,
ip_type: str | IPTypes = IPTypes.PRIVATE,
user_agent: Optional[str] = None,
refresh_strategy: str | RefreshStrategy = RefreshStrategy.BACKGROUND,
) -> None:
# create event loop and start it in background thread
self._loop: asyncio.AbstractEventLoop = asyncio.new_event_loop()
self._thread = Thread(target=self._loop.run_forever, daemon=True)
self._thread.start()
self._cache: Dict[str, RefreshAheadCache] = {}
self._cache: Dict[str, Union[RefreshAheadCache, LazyRefreshCache]] = {}
# initialize default params
self._quota_project = quota_project
self._alloydb_api_endpoint = alloydb_api_endpoint
Expand All @@ -83,6 +91,10 @@ def __init__(
if isinstance(ip_type, str):
ip_type = IPTypes(ip_type.upper())
self._ip_type = ip_type
# if refresh_strategy is str, convert to RefreshStrategy enum
if isinstance(refresh_strategy, str):
refresh_strategy = RefreshStrategy(refresh_strategy.upper())
self._refresh_strategy = refresh_strategy
self._user_agent = user_agent
# initialize credentials
scopes = ["https://www.googleapis.com/auth/cloud-platform"]
Expand Down Expand Up @@ -155,7 +167,10 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
if instance_uri in self._cache:
cache = self._cache[instance_uri]
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
if self._refresh_strategy == RefreshStrategy.LAZY:
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache

connect_func = {
Expand Down
18 changes: 18 additions & 0 deletions google/cloud/alloydb/connector/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from enum import Enum


Expand All @@ -30,3 +32,19 @@ def _missing_(cls, value: object) -> None:
f"Incorrect value for ip_type, got '{value}'. Want one of: "
f"{', '.join([repr(m.value) for m in cls])}."
)


class RefreshStrategy(Enum):
"""
Enum for specifying refresh strategy to connect to AlloyDB with.
"""

LAZY: str = "LAZY"
BACKGROUND: str = "BACKGROUND"

@classmethod
def _missing_(cls, value: object) -> None:
raise ValueError(
f"Incorrect value for refresh_strategy, got '{value}'. Want one of: "
f"{', '.join([repr(m.value) for m in cls])}."
)
127 changes: 127 additions & 0 deletions google/cloud/alloydb/connector/lazy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import logging
from typing import Optional

from google.cloud.alloydb.connector.client import AlloyDBClient
from google.cloud.alloydb.connector.connection_info import ConnectionInfo
from google.cloud.alloydb.connector.instance import _parse_instance_uri
from google.cloud.alloydb.connector.refresh_utils import _refresh_buffer

logger = logging.getLogger(name=__name__)


class LazyRefreshCache:
"""Cache that refreshes connection info when a caller requests a connection.

Only refreshes the cache when a new connection is requested and the current
certificate is close to or already expired.

This is the recommended option for serverless environments.
"""

def __init__(
self,
instance_uri: str,
client: AlloyDBClient,
keys: asyncio.Future,
) -> None:
"""Initializes a LazyRefreshCache instance.

Args:
instance_connection_string (str): The AlloyDB Instance's
connection URI.
client (AlloyDBClient): The AlloyDB client instance.
keys (asyncio.Future): A future to the client's public-private key
pair.
"""
# validate and parse instance connection name
self._project, self._region, self._cluster, self._name = _parse_instance_uri(
instance_uri
)
self._instance_uri = instance_uri

self._keys = keys
self._client = client
self._lock = asyncio.Lock()
self._cached: Optional[ConnectionInfo] = None
self._needs_refresh = False

async def force_refresh(self) -> None:
"""
Invalidates the cache and configures the next call to
connect_info() to retrieve a fresh ConnectionInfo instance.
"""
async with self._lock:
self._needs_refresh = True

async def connect_info(self) -> ConnectionInfo:
"""Retrieves ConnectionInfo instance for establishing a secure
connection to the AlloyDB instance.
"""
async with self._lock:
# If connection info is cached, check expiration.
# Pad expiration with a buffer to give the client plenty of time to
# establish a connection to the server with the certificate.
if (
self._cached
and not self._needs_refresh
and datetime.now(timezone.utc)
< (self._cached.expiration - timedelta(seconds=_refresh_buffer))
):
logger.debug(
f"['{self._instance_uri}']: Connection info "
"is still valid, using cached info"
)
return self._cached
logger.debug(
f"['{self._instance_uri}']: Connection info "
"refresh operation started"
)
try:
conn_info = await self._client.get_connection_info(
self._project,
self._region,
self._cluster,
self._name,
self._keys,
)
except Exception as e:
logger.debug(
f"['{self._instance_uri}']: Connection info "
f"refresh operation failed: {str(e)}"
)
raise
logger.debug(
f"['{self._instance_uri}']: Connection info "
"refresh operation completed successfully"
)
logger.debug(
f"['{self._instance_uri}']: Current certificate "
f"expiration = {str(conn_info.expiration)}"
)
self._cached = conn_info
self._needs_refresh = False
return conn_info

async def close(self) -> None:
"""Close is a no-op and provided purely for a consistent interface with
other cache types.
"""
pass
Loading
Loading