Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add standardized debug logging #354

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,28 @@ pool = sqlalchemy.create_engine(
connector.close()
```

### Debug Logging

The AlloyDB Python Connector uses the standard [Python logging module][python-logging]
for debug logging support.

Add the below code to your application to enable debug logging with the AlloyDB
Python Connector:

```python
import logging

logging.basicConfig(format="%(asctime)s [%(levelname)s]: %(message)s")
logger = logging.getLogger(name="google.cloud.alloydb.connector")
logger.setLevel(logging.DEBUG)
```

For more details on configuring logging, please refer to the
[Python logging docs][configure-logging].

[python-logging]: https://docs.python.org/3/library/logging.html
[configure-logging]: https://docs.python.org/3/howto/logging.html#configuring-logging

## Support policy

### Major version lifecycle
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/alloydb/connector/async_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import asyncio
import logging
from types import TracebackType
from typing import Any, Dict, Optional, Type, TYPE_CHECKING, Union

Expand All @@ -33,6 +34,8 @@
if TYPE_CHECKING:
from google.auth.credentials import Credentials

logger = logging.getLogger(name=__name__)


class AsyncConnector:
"""A class to configure and create connections to Cloud SQL instances
Expand Down Expand Up @@ -141,10 +144,18 @@ async def connect(
cache = self._cache[instance_uri]
else:
if self._refresh_strategy == RefreshStrategy.LAZY:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to lazy refresh"
)
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to background"
" refresh"
)
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache
logger.debug(f"['{instance_uri}']: Connection info added to cache")

connect_func = {
"asyncpg": asyncpg.connect,
Expand All @@ -168,6 +179,7 @@ async def connect(
ip_type = IPTypes(ip_type.upper())
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")

# callable to be used for auto IAM authn
def get_authentication_token() -> str:
Expand Down
6 changes: 2 additions & 4 deletions google/cloud/alloydb/connector/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ async def _get_metadata(
Returns:
dict: IP addresses of the AlloyDB instance.
"""
logger.debug(f"['{project}/{region}/{cluster}/{name}']: Requesting metadata")

headers = {
"Authorization": f"Bearer {self._credentials.token}",
}
Expand Down Expand Up @@ -165,8 +163,6 @@ async def _get_client_certificate(
Tuple[str, list[str]]: Tuple containing the CA certificate
and certificate chain for the AlloyDB instance.
"""
logger.debug(f"['{project}/{region}/{cluster}']: Requesting client certificate")

headers = {
"Authorization": f"Bearer {self._credentials.token}",
}
Expand Down Expand Up @@ -252,4 +248,6 @@ async def get_connection_info(

async def close(self) -> None:
"""Close AlloyDBClient gracefully."""
logger.debug("Waiting for connector's http client to close")
await self._client.close()
logger.debug("Closed connector's http client")
12 changes: 12 additions & 0 deletions google/cloud/alloydb/connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import asyncio
from functools import partial
import logging
import socket
import struct
from threading import Thread
Expand All @@ -41,6 +42,8 @@

from google.auth.credentials import Credentials

logger = logging.getLogger(name=__name__)

# the port the AlloyDB server-side proxy receives connections on
SERVER_PROXY_PORT = 5433
# the maximum amount of time to wait before aborting a metadata exchange
Expand Down Expand Up @@ -170,10 +173,18 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
cache = self._cache[instance_uri]
else:
if self._refresh_strategy == RefreshStrategy.LAZY:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to lazy refresh"
)
cache = LazyRefreshCache(instance_uri, self._client, self._keys)
else:
logger.debug(
f"['{instance_uri}']: Refresh strategy is set to background"
" refresh"
)
cache = RefreshAheadCache(instance_uri, self._client, self._keys)
self._cache[instance_uri] = cache
logger.debug(f"['{instance_uri}']: Connection info added to cache")

connect_func = {
"pg8000": pg8000.connect,
Expand All @@ -197,6 +208,7 @@ async def connect_async(self, instance_uri: str, driver: str, **kwargs: Any) ->
ip_type = IPTypes(ip_type.upper())
conn_info = await cache.connect_info()
ip_address = conn_info.get_preferred_ip(ip_type)
logger.debug(f"['{instance_uri}']: Connecting to {ip_address}:5433")

# synchronous drivers are blocking and run using executor
try:
Expand Down
38 changes: 32 additions & 6 deletions google/cloud/alloydb/connector/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
from __future__ import annotations

import asyncio
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import logging
import re
from typing import Tuple, TYPE_CHECKING
Expand Down Expand Up @@ -104,7 +107,9 @@ async def _perform_refresh(self) -> ConnectionInfo:
ConnectionInfo: Result of the refresh operation.
"""
self._refresh_in_progress.set()
logger.debug(f"['{self._instance_uri}']: Entered _perform_refresh")
logger.debug(
f"['{self._instance_uri}']: Connection info refresh operation started"
)

try:
await self._refresh_rate_limiter.acquire()
Expand All @@ -115,10 +120,19 @@ async def _perform_refresh(self) -> ConnectionInfo:
self._name,
self._keys,
)
logger.debug(
f"['{self._instance_uri}']: Connection info refresh operation"
" complete"
)
logger.debug(
f"['{self._instance_uri}']: Current certificate expiration = "
f"{connection_info.expiration.isoformat()}"
)

except Exception:
except Exception as e:
logger.debug(
f"['{self._instance_uri}']: Error occurred during _perform_refresh."
f"['{self._instance_uri}']: Connection info refresh operation"
f" failed: {str(e)}"
)
raise

Expand Down Expand Up @@ -153,7 +167,6 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo:
refresh_task: asyncio.Task
try:
if delay > 0:
logger.debug(f"['{self._instance_uri}']: Entering sleep")
await asyncio.sleep(delay)
refresh_task = asyncio.create_task(self._perform_refresh())
refresh_result = await refresh_task
Expand All @@ -162,6 +175,11 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo:
raise RefreshError(
f"['{self._instance_uri}']: Invalid refresh operation. Certficate appears to be expired."
)
except asyncio.CancelledError:
logger.debug(
f"['{self._instance_uri}']: Scheduled refresh operation cancelled"
)
raise
# bad refresh attempt
except Exception:
logger.info(
Expand All @@ -180,6 +198,12 @@ async def _refresh_operation(self, delay: int) -> ConnectionInfo:
self._current = refresh_task
# calculate refresh delay based on certificate expiration
delay = _seconds_until_refresh(refresh_result.expiration)
logger.debug(
f"['{self._instance_uri}']: Connection info refresh operation"
" scheduled for "
f"{(datetime.now(timezone.utc) + timedelta(seconds=delay)).isoformat(timespec='seconds')} "
f"(now + {timedelta(seconds=delay)})"
)
self._next = self._schedule_refresh(delay)

return refresh_result
Expand Down Expand Up @@ -207,9 +231,11 @@ async def close(self) -> None:
"""
Cancel refresh tasks.
"""
logger.debug(f"['{self._instance_uri}']: Waiting for _current to be cancelled")
logger.debug(
f"['{self._instance_uri}']: Canceling connection info refresh"
" operation tasks"
)
self._current.cancel()
logger.debug(f"['{self._instance_uri}']: Waiting for _next to be cancelled")
self._next.cancel()
# gracefully wait for tasks to cancel
tasks = asyncio.gather(self._current, self._next, return_exceptions=True)
Expand Down
Loading