Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(key-value): add superset metastore cache #19232

Merged
merged 13 commits into from
Mar 21, 2022
17 changes: 12 additions & 5 deletions docs/docs/installation/cache.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version: 1

## Caching

Superset uses [Flask-Caching](https://flask-caching.readthedocs.io/) for caching purpose. Configuring caching is as easy as providing a custom cache config in your
Superset uses [Flask-Caching](https://flask-caching.readthedocs.io/) for caching purposes. Configuring caching is as easy as providing a custom cache config in your
`superset_config.py` that complies with [the Flask-Caching specifications](https://flask-caching.readthedocs.io/en/latest/#configuring-flask-caching).
Flask-Caching supports various caching backends, including Redis, Memcached, SimpleCache (in-memory), or the
local filesystem. Custom cache backends are also supported. See [here](https://flask-caching.readthedocs.io/en/latest/#custom-cache-backends) for specifics.
Expand All @@ -18,10 +18,17 @@ The following cache configurations can be customized:
- Dashboard filter state (required): `FILTER_STATE_CACHE_CONFIG`.
- Explore chart form data (required): `EXPLORE_FORM_DATA_CACHE_CONFIG`

Please note, that Dashboard and Explore caching is required. When running Superset in debug mode, both Explore and Dashboard caches will default to `SimpleCache`;
However, trying to run Superset in non-debug mode without defining a cache for these will cause the application to fail on startup. When running
superset in single-worker mode, any cache backend is supported. However, when running Superset in on a multi-worker setup, a dedicated cache is required. For this
we recommend using either Redis or Memcached:
Please note, that Dashboard and Explore caching is required. If these caches are undefined, Superset falls back to using a built-in cache that stores data
in the metadata database. While it is recommended to use a dedicated cache, the built-in cache can also be used to cache other data.
For example, to use the built-in cache to store chart data, use the following config:

```python
DATA_CACHE_CONFIG = {
"CACHE_TYPE": "SupersetMetastoreCache",
"CACHE_KEY_PREFIX": "superset_results", # make sure this string is unique to avoid collisions
"CACHE_DEFAULT_TIMEOUT": 86400, # 60 seconds * 60 minutes * 24 hours
michael-s-molina marked this conversation as resolved.
Show resolved Hide resolved
}
```

- Redis (recommended): we recommend the [redis](https://pypi.python.org/pypi/redis) Python package
- Memcached: we recommend using [pylibmc](https://pypi.org/project/pylibmc/) client library as
Expand Down
5 changes: 4 additions & 1 deletion superset/dashboards/permalink/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ def run(self) -> str:
"state": self.state,
}
return CreateKeyValueCommand(
self.actor, self.resource, value, self.key_type
actor=self.actor,
resource=self.resource,
value=value,
key_type=self.key_type,
).run()
except SQLAlchemyError as ex:
logger.exception("Error running create command")
Expand Down
2 changes: 1 addition & 1 deletion superset/dashboards/permalink/commands/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run(self) -> Optional[DashboardPermalinkValue]:
self.validate()
try:
command = GetKeyValueCommand(
self.resource, self.key, key_type=self.key_type
resource=self.resource, key=self.key, key_type=self.key_type
)
value: Optional[DashboardPermalinkValue] = command.run()
if value:
Expand Down
5 changes: 4 additions & 1 deletion superset/explore/permalink/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def run(self) -> str:
"state": self.state,
}
command = CreateKeyValueCommand(
self.actor, self.resource, value, self.key_type
actor=self.actor,
resource=self.resource,
value=value,
key_type=self.key_type,
)
return command.run()
except SQLAlchemyError as ex:
Expand Down
2 changes: 1 addition & 1 deletion superset/explore/permalink/commands/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def run(self) -> Optional[ExplorePermalinkValue]:
self.validate()
try:
value: Optional[ExplorePermalinkValue] = GetKeyValueCommand(
self.resource, self.key, key_type=self.key_type
resource=self.resource, key=self.key, key_type=self.key_type
).run()
if value:
chart_id: Optional[int] = value.get("chartId")
Expand Down
3 changes: 2 additions & 1 deletion superset/extensions.py → superset/extensions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
import json
import os
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional

import celery
Expand Down Expand Up @@ -108,7 +109,7 @@ def init_app(self, app: Flask) -> None:
app.wsgi_app = SupersetProfiler(app.wsgi_app, self.interval) # type: ignore


APP_DIR = os.path.dirname(__file__)
APP_DIR = os.path.join(os.path.dirname(__file__), os.path.pardir)
appbuilder = AppBuilder(update_perms=False)
async_query_manager = AsyncQueryManager()
cache_manager = CacheManager()
Expand Down
117 changes: 117 additions & 0 deletions superset/extensions/metastore_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datetime import datetime, timedelta
from hashlib import md5
from typing import Any, Dict, List, Optional
from uuid import UUID, uuid3

from flask import Flask
from flask_caching import BaseCache

from superset.key_value.exceptions import KeyValueCreateFailedError
from superset.key_value.types import KeyType

RESOURCE = "superset_metastore_cache"
KEY_TYPE: KeyType = "uuid"


class SupersetMetastoreCache(BaseCache):
def __init__(self, namespace: UUID, default_timeout: int = 300) -> None:
super().__init__(default_timeout)
self.namespace = namespace

@classmethod
def factory(
cls, app: Flask, config: Dict[str, Any], args: List[Any], kwargs: Dict[str, Any]
) -> BaseCache:
# base namespace for generating deterministic UUIDs
md5_obj = md5()
seed = config.get("CACHE_KEY_PREFIX", "")
md5_obj.update(seed.encode("utf-8"))
kwargs["namespace"] = UUID(md5_obj.hexdigest())
return cls(*args, **kwargs)

def get_key(self, key: str) -> str:
return str(uuid3(self.namespace, key))

@staticmethod
def _prune() -> None:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete_expired import (
DeleteExpiredKeyValueCommand,
)

DeleteExpiredKeyValueCommand(resource=RESOURCE).run()

def _get_expiry(self, timeout: Optional[int]) -> Optional[datetime]:
timeout = self._normalize_timeout(timeout)
if timeout is not None and timeout > 0:
return datetime.now() + timedelta(seconds=timeout)
return None

def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.upsert import UpsertKeyValueCommand

UpsertKeyValueCommand(
resource=RESOURCE,
key_type=KEY_TYPE,
key=self.get_key(key),
value=value,
expires_on=self._get_expiry(timeout),
).run()
return True

def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.create import CreateKeyValueCommand

try:
CreateKeyValueCommand(
resource=RESOURCE,
value=value,
key_type=KEY_TYPE,
key=self.get_key(key),
expires_on=self._get_expiry(timeout),
).run()
self._prune()
return True
except KeyValueCreateFailedError:
return False

def get(self, key: str) -> Any:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.get import GetKeyValueCommand

return GetKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
).run()

def has(self, key: str) -> bool:
entry = self.get(key)
if entry:
return True
return False

def delete(self, key: str) -> Any:
# pylint: disable=import-outside-toplevel
from superset.key_value.commands.delete import DeleteKeyValueCommand

return DeleteKeyValueCommand(
resource=RESOURCE, key_type=KEY_TYPE, key=self.get_key(key),
).run()
25 changes: 21 additions & 4 deletions superset/key_value/commands/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pickle
from datetime import datetime
from typing import Any, Optional
from uuid import UUID

from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError
Expand All @@ -33,18 +34,20 @@


class CreateKeyValueCommand(BaseCommand):
actor: User
actor: Optional[User]
resource: str
value: Any
key_type: KeyType
key: Optional[str]
expires_on: Optional[datetime]

def __init__(
self,
actor: User,
resource: str,
value: Any,
key_type: KeyType,
key_type: KeyType = "uuid",
actor: Optional[User] = None,
key: Optional[str] = None,
expires_on: Optional[datetime] = None,
):
"""
Expand All @@ -53,19 +56,23 @@ def __init__(
:param resource: the resource (dashboard, chart etc)
:param value: the value to persist in the key-value store
:param key_type: the type of the key to return
:param actor: the user performing the command
:param key: id of entry (autogenerated if undefined)
:param expires_on: entry expiration time
:return: the key associated with the persisted value
"""
self.resource = resource
self.actor = actor
self.value = value
self.key_type = key_type
self.key = key
self.expires_on = expires_on

def run(self) -> str:
try:
return self.create()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running create command")
raise KeyValueCreateFailedError() from ex

Expand All @@ -77,9 +84,19 @@ def create(self) -> str:
resource=self.resource,
value=pickle.dumps(self.value),
created_on=datetime.now(),
created_by_fk=None if self.actor.is_anonymous else self.actor.id,
created_by_fk=None
if self.actor is None or self.actor.is_anonymous
else self.actor.id,
expires_on=self.expires_on,
)
if self.key is not None:
try:
if self.key_type == "uuid":
entry.uuid = UUID(self.key)
else:
entry.id = int(self.key)
except ValueError as ex:
raise KeyValueCreateFailedError() from ex
db.session.add(entry)
db.session.commit()
return extract_key(entry, self.key_type)
6 changes: 3 additions & 3 deletions superset/key_value/commands/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Optional

from flask_appbuilder.security.sqla.models import User
from sqlalchemy.exc import SQLAlchemyError
Expand All @@ -30,13 +31,12 @@


class DeleteKeyValueCommand(BaseCommand):
actor: User
key: str
key_type: KeyType
resource: str

def __init__(
self, actor: User, resource: str, key: str, key_type: KeyType = "uuid"
self, resource: str, key: str, key_type: KeyType = "uuid",
):
"""
Delete a key-value pair
Expand All @@ -47,14 +47,14 @@ def __init__(
:return: was the entry deleted or not
"""
self.resource = resource
self.actor = actor
self.key = key
self.key_type = key_type

def run(self) -> bool:
try:
return self.delete()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running delete command")
raise KeyValueDeleteFailedError() from ex

Expand Down
60 changes: 60 additions & 0 deletions superset/key_value/commands/delete_expired.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from datetime import datetime

from sqlalchemy.exc import SQLAlchemyError

from superset import db
from superset.commands.base import BaseCommand
from superset.key_value.exceptions import KeyValueDeleteFailedError
from superset.key_value.models import KeyValueEntry

logger = logging.getLogger(__name__)


class DeleteExpiredKeyValueCommand(BaseCommand):
resource: str

def __init__(self, resource: str):
"""
Delete all expired key-value pairs

:param resource: the resource (dashboard, chart etc)
:return: was the entry deleted or not
"""
self.resource = resource

def run(self) -> None:
try:
self.delete_expired()
except SQLAlchemyError as ex:
db.session.rollback()
logger.exception("Error running delete command")
raise KeyValueDeleteFailedError() from ex

def validate(self) -> None:
pass

@staticmethod
def delete_expired() -> None:
(
db.session.query(KeyValueEntry)
.filter(KeyValueEntry.expires_on <= datetime.now())
.delete()
)
db.session.commit()
Loading