diff --git a/warehouse/metrics_mesh/models/marts/metrics/metrics_v0.sql b/warehouse/metrics_mesh/models/marts/metrics/metrics_v0.sql index f86892e3a..9bccfa1b0 100644 --- a/warehouse/metrics_mesh/models/marts/metrics/metrics_v0.sql +++ b/warehouse/metrics_mesh/models/marts/metrics/metrics_v0.sql @@ -28,18 +28,25 @@ WITH unioned_metric_names AS ( SELECT DISTINCT metric FROM unioned_metric_names +), all_metrics_metadata AS ( + SELECT + metric, + display_name, + description + FROM metrics.metrics_metadata ), metrics_v0_no_casting AS ( SELECT - @oso_id('OSO', 'oso', metric) AS metric_id, + @oso_id('OSO', 'oso', t.metric) AS metric_id, 'OSO' AS metric_source, 'oso' AS metric_namespace, - metric AS metric_name, - metric AS display_name, - 'TODO' AS description, + t.metric AS metric_name, + COALESCE(m.display_name, t.metric) AS display_name, + COALESCE(m.description, 'TODO') AS description, NULL AS raw_definition, 'TODO' AS definition_ref, 'UNKNOWN' AS aggregation_function - FROM all_timeseries_metric_names + FROM all_timeseries_metric_names t + LEFT JOIN all_metrics_metadata m ON t.metric = m.metric ) SELECT metric_id::TEXT, diff --git a/warehouse/metrics_mesh/models/metrics_factories.py b/warehouse/metrics_mesh/models/metrics_factories.py index e349ade96..192829e21 100644 --- a/warehouse/metrics_mesh/models/metrics_factories.py +++ b/warehouse/metrics_mesh/models/metrics_factories.py @@ -1,6 +1,7 @@ import os from dotenv import load_dotenv +from metrics_tools.definition import MetricMetadata from metrics_tools.factory import MetricQueryDef, RollingConfig, timeseries_metrics # Annoyingly sqlmesh doesn't load things in an expected order but we want to be @@ -36,6 +37,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Stars", + description="Metrics related to GitHub stars", + ), ), "commits": MetricQueryDef( ref="code/commits.sql", @@ -47,41 +52,73 @@ slots=8, ), over_all_time=True, + metadata=MetricMetadata( + display_name="Commits", + description="Metrics related to GitHub commits", + ), ), "comments": MetricQueryDef( ref="code/comments.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Comments", + description="Metrics related to GitHub comments", + ), ), "releases": MetricQueryDef( ref="code/releases.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Releases", + description="Metrics related to GitHub releases", + ), ), "forks": MetricQueryDef( ref="code/forks.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Forks", + description="Metrics related to GitHub repository forks", + ), ), "repositories": MetricQueryDef( ref="code/repositories.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Repositories", + description="Metrics related to GitHub repositories", + ), ), "active_contracts": MetricQueryDef( ref="blockchain/active_contracts.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Active Contracts", + description="Metrics related to active blockchain contracts", + ), ), "contributors": MetricQueryDef( ref="code/contributors.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Contributors", + description="Metrics related to GitHub contributors", + ), ), "active_developers": MetricQueryDef( ref="code/active_developers.sql", time_aggregations=["daily", "weekly", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Active Developers", + description="Metrics related to active GitHub developers", + ), ), # This defines something with a rolling option that allows you to look back # to some arbitrary window. So you specify the window and specify the unit. @@ -134,6 +171,10 @@ cron="@monthly", slots=32, ), + metadata=MetricMetadata( + display_name="Developer Classifications", + description="Metrics related to developer activity classifications", + ), ), "contributor_classifications": MetricQueryDef( ref="code/contributor_activity_classification.sql", @@ -152,6 +193,10 @@ cron="@monthly", slots=32, ), + metadata=MetricMetadata( + display_name="Contributor Classifications", + description="Metrics related to contributor activity classifications", + ), ), # Currently this query performs really poorly. We need to do some debugging on it # "user_retention_classifications": MetricQueryDef( @@ -174,6 +219,10 @@ cron="@monthly", slots=32, ), + metadata=MetricMetadata( + display_name="Change in Developer Activity", + description="Metrics related to change in developer activity", + ), ), "opened_pull_requests": MetricQueryDef( ref="code/prs_opened.sql", @@ -185,6 +234,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Opened Pull Requests", + description="Metrics related to opened GitHub pull requests", + ), ), "merged_pull_requests": MetricQueryDef( ref="code/prs_merged.sql", @@ -196,6 +249,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Merged Pull Requests", + description="Metrics related to merged GitHub pull requests", + ), ), "opened_issues": MetricQueryDef( ref="code/issues_opened.sql", @@ -207,6 +264,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Opened Issues", + description="Metrics related to opened GitHub issues", + ), ), "closed_issues": MetricQueryDef( ref="code/issues_closed.sql", @@ -218,6 +279,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Closed Issues", + description="Metrics related to closed GitHub issues", + ), ), "avg_prs_time_to_merge": MetricQueryDef( ref="code/prs_time_to_merge.sql", @@ -229,6 +294,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Average PR Time to Merge", + description="Metrics related to average GitHub PR time to merge", + ), ), "avg_time_to_first_response": MetricQueryDef( ref="code/time_to_first_response.sql", @@ -240,6 +309,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Average Time to First Response", + description="Metrics related to average time to first response", + ), ), "active_addresses_aggregation": MetricQueryDef( ref="blockchain/active_addresses.sql", @@ -254,6 +327,10 @@ ), time_aggregations=["daily", "monthly"], over_all_time=True, + metadata=MetricMetadata( + display_name="Active Addresses Aggregation", + description="Metrics related to active blockchain addresses", + ), ), "gas_fees": MetricQueryDef( ref="blockchain/gas_fees.sql", @@ -265,6 +342,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Gas Fees", + description="Metrics related to blockchain gas fees", + ), ), "transactions": MetricQueryDef( ref="blockchain/transactions.sql", @@ -276,6 +357,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Transactions", + description="Metrics related to blockchain transactions", + ), ), "contributors_lifecycle": MetricQueryDef( ref="code/lifecycle.sql", @@ -294,6 +379,10 @@ slots=32, ), entity_types=["artifact", "project", "collection"], + metadata=MetricMetadata( + display_name="Contributors Lifecycle", + description="Metrics related to contributor lifecycle", + ), ), "funding_received": MetricQueryDef( ref="funding/funding_received.sql", @@ -305,6 +394,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Funding Received", + description="Metrics related to funding received", + ), ), "dependencies": MetricQueryDef( ref="deps/dependencies.sql", @@ -316,6 +409,10 @@ ), entity_types=["artifact", "project", "collection"], over_all_time=True, + metadata=MetricMetadata( + display_name="Dependencies", + description="Metrics related to dependencies", + ), ), }, default_dialect="clickhouse", diff --git a/warehouse/metrics_tools/definition.py b/warehouse/metrics_tools/definition.py index e60585287..34570e115 100644 --- a/warehouse/metrics_tools/definition.py +++ b/warehouse/metrics_tools/definition.py @@ -140,6 +140,12 @@ def assert_allowed_items_in_list[T](to_validate: t.List[T], allowed_items: t.Lis assert item in allowed_items, "List contains invalid items" +@dataclass(kw_only=True) +class MetricMetadata: + description: str + display_name: str + + @dataclass(kw_only=True) class MetricQueryDef: # The relative path to the query in `oso_metrics` @@ -166,6 +172,8 @@ class MetricQueryDef: use_python_model: bool = True + metadata: t.Optional[MetricMetadata] = None + def raw_sql(self, queries_dir: str): return open(os.path.join(queries_dir, self.ref)).read() diff --git a/warehouse/metrics_tools/factory/constants.py b/warehouse/metrics_tools/factory/constants.py index 0bd1cbb21..184eafde3 100644 --- a/warehouse/metrics_tools/factory/constants.py +++ b/warehouse/metrics_tools/factory/constants.py @@ -33,3 +33,9 @@ "amount": exp.DataType.build("DOUBLE", dialect="duckdb"), }, } + +METRIC_METADATA_COLUMNS: t.Dict[str, exp.DataType] = { + "display_name": exp.DataType.build("STRING", dialect="duckdb"), + "description": exp.DataType.build("STRING", dialect="duckdb"), + "metric": exp.DataType.build("STRING", dialect="duckdb"), +} diff --git a/warehouse/metrics_tools/factory/factory.py b/warehouse/metrics_tools/factory/factory.py index 865e515f0..86bba1f1a 100644 --- a/warehouse/metrics_tools/factory/factory.py +++ b/warehouse/metrics_tools/factory/factory.py @@ -1,13 +1,16 @@ +import functools import inspect import logging import os import textwrap import typing as t -from dataclasses import dataclass, field +from collections import defaultdict +from dataclasses import asdict, dataclass, field from pathlib import Path from queue import PriorityQueue from metrics_tools.definition import ( + MetricMetadata, MetricQuery, PeerMetricDependencyRef, TimeseriesMetricsOptions, @@ -53,6 +56,7 @@ class MetricQueryConfig(t.TypedDict): rendered_query: exp.Expression vars: t.Dict[str, t.Any] query: MetricQuery + metadata: t.Optional[MetricMetadata] class MetricsCycle(Exception): @@ -209,6 +213,7 @@ def _generate_metrics_queries( rendered_query=rendered_query[0], vars=query._source.vars or {}, query=query, + metadata=query._source.metadata, ) return queries @@ -300,7 +305,11 @@ def queue_query(name: str): def generate_models(self, calling_file: str): """Generates sqlmesh models for all the configured metrics definitions""" - from metrics_tools.factory.proxy.proxies import join_all_of_entity_type + from metrics_tools.factory.proxy.proxies import ( + aggregate_metadata, + join_all_of_entity_type, + map_metadata_to_metric, + ) # Generate the models @@ -369,6 +378,65 @@ def generate_models(self, calling_file: str): enabled=self._raw_options.get("enabled", True), )(join_all_of_entity_type) + raw_table_metadata = { + key: asdict(value["metadata"]) + for key, value in self._rendered_queries.items() + if value["metadata"] is not None + } + + transformed_metadata = defaultdict(list) + + for table, metadata in raw_table_metadata.items(): + metadata_tuple = tuple(metadata.items()) + transformed_metadata[metadata_tuple].append(table) + + table_meta = [ + {"metadata": dict(meta), "tables": tables} + for meta, tables in transformed_metadata.items() + ] + + metadata_depends_on = functools.reduce( + lambda x, y: x.union({f"metrics.metrics_metadata_{ident}" for ident in y}), + transformed_metadata.values(), + set(), + ) + + for elem in table_meta: + meta = elem["metadata"] + tables = elem["tables"] + + for ident in tables: + MacroOverridingModel( + additional_macros=[], + override_module_path=override_module_path, + override_path=override_path, + locals=dict( + db=self.catalog, + table=ident, + metadata=meta, + ), + name=f"metrics.metrics_metadata_{ident}", + is_sql=True, + kind="VIEW", + dialect="clickhouse", + columns=constants.METRIC_METADATA_COLUMNS, + enabled=self._raw_options.get("enabled", True), + )(map_metadata_to_metric) + + MacroOverridingModel( + additional_macros=[], + depends_on=metadata_depends_on, + override_module_path=override_module_path, + override_path=override_path, + locals={}, + name="metrics.metrics_metadata", + is_sql=True, + kind="VIEW", + dialect="clickhouse", + columns=constants.METRIC_METADATA_COLUMNS, + enabled=self._raw_options.get("enabled", True), + )(aggregate_metadata) + logger.info("model generation complete") def generate_model_for_rendered_query( @@ -468,7 +536,10 @@ def generate_time_aggregation_model_for_rendered_query( # Use a simple python sql model to generate the time_aggregation model ref = query_config["ref"] - if not ref.get("time_aggregation") or ref.get("time_aggregation") == "over_all_time": + if ( + not ref.get("time_aggregation") + or ref.get("time_aggregation") == "over_all_time" + ): return None columns = constants.METRICS_COLUMNS_BY_ENTITY[ref["entity_type"]] diff --git a/warehouse/metrics_tools/factory/proxy/proxies.py b/warehouse/metrics_tools/factory/proxy/proxies.py index 4080cc20b..33a085a7e 100644 --- a/warehouse/metrics_tools/factory/proxy/proxies.py +++ b/warehouse/metrics_tools/factory/proxy/proxies.py @@ -109,3 +109,78 @@ def join_all_of_entity_type( ) # Calculate the correct metric_id for all of the entity types return query + + +def map_metadata_to_metric( + evaluator: MacroEvaluator, +): + db = t.cast(str, evaluator.var("db")) + table = t.cast(str, evaluator.var("table")) + metadata = t.cast(t.Dict[str, t.Any], evaluator.var("metadata")) + + description = metadata["description"] + display_name = metadata["display_name"] + + metrics_alias = exp.Concat( + expressions=[ + exp.to_column("event_source"), + exp.Literal(this="_", is_string=True), + exp.to_column("metric"), + ], + safe=False, + coalesce=False, + ).as_("metric") + + return ( + exp.select( + exp.Literal(this=display_name, is_string=True).as_("display_name"), + exp.Literal(this=description, is_string=True).as_("description"), + metrics_alias, + ) + .from_(sql.to_table(f"{db}.{table}")) + .distinct() + ) + + +def aggregate_metadata( + evaluator: MacroEvaluator, +): + from functools import reduce + + if evaluator.runtime_stage in ["loading", "creating"]: + return exp.select( + exp.Literal(this="...", is_string=True).as_("display_name"), + exp.Literal(this="...", is_string=True).as_("description"), + exp.Literal(this="...", is_string=True).as_("metric"), + ) + + model_names = [snap.name for snap in evaluator._snapshots.values()] + metadata_model_names = list( + filter( + lambda model_name: model_name.startswith( + '"oso"."metrics"."metrics_metadata_' + ), + model_names, + ) + ) + + def make_select(table: str): + return exp.select( + exp.column("display_name"), + exp.column("description"), + exp.column("metric"), + ).from_(sql.to_table(f"{table}")) + + selects = [make_select(model) for model in metadata_model_names] + + unique_metrics = reduce(lambda acc, cur: acc.union(cur), selects) + + return ( + exp.select( + exp.column("display_name"), + exp.column("description"), + exp.column("metric"), + ) + .from_(unique_metrics.subquery()) + .as_("metadata") + )