Skip to content

Commit

Permalink
data: migrate int_events__dependencies to sqlmesh (#2927)
Browse files Browse the repository at this point in the history
* This includes porting the deps.dev macros from dbt to sqlmesh syntax
  • Loading branch information
ryscheng authored Feb 1, 2025
1 parent 93b96a3 commit de38fb6
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 2 deletions.
184 changes: 184 additions & 0 deletions warehouse/metrics_mesh/macros/deps_dev/deps_dev_artifact_details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@

from sqlglot import expressions as exp
from sqlmesh import macro
from sqlmesh.core.macros import MacroEvaluator


@macro()
def deps_parse_namespace(evaluator: MacroEvaluator, event_source: exp.Expression, artifact_name: exp.Expression):
"""
Macro to parse the namespace from the artifact name based on the event source.
Arguments:
- event_source: The event source of the artifact.
- artifact_name: The name of the artifact.
Returns the namespace based on event source rules.
"""

name = exp.Case(
ifs=[
exp.If(
this=exp.And(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('NPM'),
),
expression=exp.GT(
this=exp.StrPosition(
this=artifact_name,
substr=exp.Literal.string('/'),
),
expression=exp.Literal.number(0),
),
),
true=exp.SplitPart(
this=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string('/'),
part_index=exp.Literal.number(1),
),
delimiter=exp.Literal.string('@'),
part_index=exp.Literal.number(2),
),
),
exp.If(
this=exp.And(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('GO'),
),
expression=exp.GT(
this=exp.StrPosition(
this=artifact_name,
substr=exp.Literal.string('/'),
),
expression=exp.Literal.number(0),
),
),
true=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string('/'),
part_index=exp.Literal.number(2),
),
),
exp.If(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('MAVEN'),
),
true=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string(':'),
part_index=exp.Literal.number(1),
),
),
exp.If(
this=exp.And(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('NUGET'),
),
expression=exp.GT(
this=exp.StrPosition(
this=artifact_name,
substr=exp.Literal.string('.'),
),
expression=exp.Literal.number(0),
),
),
true=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string('.'),
part_index=exp.Literal.number(1),
),
),
],
default=artifact_name,
)
return name

@macro()
def deps_parse_name(evaluator: MacroEvaluator, event_source: exp.Expression, artifact_name: exp.Expression):
"""
Macro to parse the name from the artifact name based on the event source.
Arguments:
- event_source: The event source of the artifact.
- artifact_name: The name of the artifact.
Returns the name based on event source rules.
"""
name = exp.Case(
ifs=[
exp.If(
this=exp.And(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('NPM'),
),
expression=exp.GT(
this=exp.StrPosition(
this=artifact_name,
substr=exp.Literal.string('/'),
),
expression=exp.Literal.number(0),
),
),
true=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string('/'),
part_index=exp.Literal.number(2),
),
),
exp.If(
this=exp.And(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('GO'),
),
expression=exp.GT(
this=exp.StrPosition(
this=artifact_name,
substr=exp.Literal.string('/'),
),
expression=exp.Literal.number(0),
),
),
true=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string('/'),
part_index=exp.Literal.number(3),
),
),
exp.If(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('MAVEN'),
),
true=exp.SplitPart(
this=artifact_name,
delimiter=exp.Literal.string(':'),
part_index=exp.Literal.number(2),
),
),
exp.If(
this=exp.And(
this=exp.EQ(
this=event_source,
expression=exp.Literal.string('NUGET'),
),
expression=exp.GT(
this=exp.StrPosition(
this=artifact_name,
substr=exp.Literal.string('.'),
),
expression=exp.Literal.number(0),
),
),
true=exp.RegexpReplace(
this=artifact_name,
expression=exp.Literal.string(r'^[^.]+\.'),
replacement=exp.Literal.string(''),
),
),
],
default=artifact_name,
)
return name
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ from (
union all
select * from metrics.int_events__github
union all
select * from @oso_source('bigquery.oso.int_events__dependencies')
select * from metrics.int_events__dependencies
union all
select * from @oso_source('bigquery.oso.int_events__open_collective')
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
MODEL (
name metrics.int_events__dependencies,
dialect trino,
kind INCREMENTAL_BY_TIME_RANGE (
time_column time,
batch_size 90,
batch_concurrency 1
),
start '2015-01-01',
cron '@daily',
partitioned_by (DAY("time"), "event_type"),
grain (time, event_type, event_source, from_artifact_id, to_artifact_id)
);

@DEF(event_source_name, 'DEPS_DEV');

with artifacts as (
select artifact_name
from metrics.int_all_artifacts
where artifact_source = 'NPM'
),

snapshots as (
select
SnapshotAt as time,
System as from_artifact_type,
Name as from_artifact_name,
Version as from_artifact_version,
Dependency.Name as to_artifact_name,
Dependency.System as to_artifact_type,
Dependency.Version as to_artifact_version,
LAG(Dependency.Name) over (
partition by System, Name, Dependency.Name, Version, Dependency.Version
order by SnapshotAt
) as previous_to_artifact_name
from @oso_source('bigquery.oso.stg_deps_dev__dependencies')
where
MinimumDepth = 1
and Dependency.Name in (select artifact_name from artifacts)
),

intermediate as (
select
time,
case
when previous_to_artifact_name is null then 'ADD_DEPENDENCY'
when
to_artifact_name is not null and to_artifact_name <> previous_to_artifact_name
then 'REMOVE_DEPENDENCY'
else 'NO_CHANGE'
end as event_type,
@event_source_name as event_source,
@deps_parse_name(to_artifact_type, to_artifact_name) as to_artifact_name,
@deps_parse_namespace(to_artifact_type, to_artifact_name) as to_artifact_namespace,
to_artifact_type,
@deps_parse_name(from_artifact_type, from_artifact_name) as from_artifact_name,
@deps_parse_namespace(from_artifact_type, from_artifact_name) as from_artifact_namespace,
from_artifact_type,
1.0 as amount
from snapshots
),

artifact_ids as (
select
time,
event_type,
event_source,
@oso_id(event_source, to_artifact_namespace, to_artifact_name) as to_artifact_id,
to_artifact_name,
to_artifact_namespace,
to_artifact_type,
@oso_id(event_source, to_artifact_type) as to_artifact_source_id,
@oso_id(event_source, from_artifact_namespace, from_artifact_name) as from_artifact_id,
from_artifact_name,
from_artifact_namespace,
from_artifact_type,
@oso_id(event_source, from_artifact_type) as from_artifact_source_id,
amount
from intermediate
where event_type <> 'NO_CHANGE'
),

changes as (
select
time,
event_type,
event_source,
to_artifact_id,
to_artifact_name,
to_artifact_namespace,
to_artifact_type,
to_artifact_source_id,
from_artifact_id,
from_artifact_name,
from_artifact_namespace,
from_artifact_type,
from_artifact_source_id,
amount,
@oso_id(
event_source,
time,
to_artifact_id,
to_artifact_type,
from_artifact_id,
from_artifact_type,
event_type
) as event_source_id
from artifact_ids
)

select
time,
to_artifact_id,
from_artifact_id,
UPPER(event_type) as event_type,
CAST(event_source_id as STRING) as event_source_id,
UPPER(event_source) as event_source,
LOWER(to_artifact_name) as to_artifact_name,
LOWER(to_artifact_namespace) as to_artifact_namespace,
UPPER(to_artifact_type) as to_artifact_type,
LOWER(to_artifact_source_id) as to_artifact_source_id,
LOWER(from_artifact_name) as from_artifact_name,
LOWER(from_artifact_namespace) as from_artifact_namespace,
UPPER(from_artifact_type) as from_artifact_type,
LOWER(from_artifact_source_id) as from_artifact_source_id,
CAST(amount as DOUBLE) as amount
from changes
2 changes: 1 addition & 1 deletion warehouse/metrics_tools/local/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
"opensource-observer.oso_playground.int_deployers": "bigquery.oso.int_deployers",
"opensource-observer.oso_playground.int_deployers_by_project": "bigquery.oso.int_deployers_by_project",
"opensource-observer.oso_playground.int_events__blockchain": "bigquery.oso.int_events__blockchain",
"opensource-observer.oso_playground.int_events__dependencies": "bigquery.oso.int_events__dependencies",
"opensource-observer.oso_playground.int_events__open_collective": "bigquery.oso.int_events__open_collective",
"opensource-observer.oso_playground.int_first_time_addresses": "bigquery.oso.int_first_time_addresses",
"opensource-observer.oso_playground.int_factories": "bigquery.oso.int_factories",
"opensource-observer.oso_playground.int_proxies": "bigquery.oso.int_proxies",
"opensource-observer.oso_playground.int_superchain_potential_bots": "bigquery.oso.int_superchain_potential_bots",
"opensource-observer.oso_playground.stg_deps_dev__dependencies": "bigquery.oso.stg_deps_dev__dependencies",
"opensource-observer.oso_playground.stg_deps_dev__packages": "bigquery.oso.stg_deps_dev__packages",
"opensource-observer.oso_playground.stg_farcaster__addresses": "bigquery.oso.stg_farcaster__addresses",
"opensource-observer.oso_playground.stg_farcaster__profiles": "bigquery.oso.stg_farcaster__profiles",
Expand Down

0 comments on commit de38fb6

Please sign in to comment.