Skip to content

Commit

Permalink
basic support for AH asset dynamic updates
Browse files Browse the repository at this point in the history
  • Loading branch information
mfornos committed Nov 21, 2024
1 parent 6cf8162 commit ee47698
Showing 1 changed file with 86 additions and 11 deletions.
97 changes: 86 additions & 11 deletions packages/server/src/services/agents/steward/agent.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
import { AbstractSublevel } from 'abstract-level'
import { EMPTY, expand, map, merge, mergeAll, mergeMap, reduce, switchMap } from 'rxjs'
import {
EMPTY,
Observer,
Subscription,
expand,
filter,
firstValueFrom,
map,
merge,
mergeAll,
mergeMap,
reduce,
switchMap,
} from 'rxjs'
import { z } from 'zod'

import { Twox64Concat, Twox128 } from '@polkadot-api/substrate-bindings'
import { mergeUint8 } from '@polkadot-api/utils'

import { IngressConsumer, NetworkInfo } from '@/services/ingress/index.js'
import { Scheduled, Scheduler } from '@/services/persistence/level/scheduler.js'
import { LevelDB, Logger, NetworkURN } from '@/services/types.js'

import { asSerializable, stringToUa8 } from '@/common/util.js'
import { asSerializable } from '@/common/util.js'
import { HexString } from '@/lib.js'

import {
Agent,
AgentMetadata,
Expand All @@ -21,6 +32,7 @@ import {
getAgentCapabilities,
} from '../types.js'

import { SharedStreams } from '../base/shared.js'
import { mappers } from './mappers.js'
import { Queries } from './queries/index.js'
import { $StewardQueryArgs, AssetIds, AssetMapper, AssetMetadata, StewardQueryArgs } from './types.js'
Expand All @@ -36,6 +48,18 @@ const STORAGE_PAGE_LEN = 100
const START_DELAY = 30_000 // 5m
const SCHED_RATE = 43_200_000 // 12h

const ASSET_PALLET_EVENTS = [
'Created',
'Issued',
'MetadataCleared',
'MetadataSet',
'OwnerChanged',
'TeamCheanged',
'AssetStatusChanged',
'AssetFrozen',
'AssetThawed',
]

/**
* The Data Steward agent.
*
Expand All @@ -52,6 +76,7 @@ export class DataSteward implements Agent, Queryable {
readonly #dbChains: LevelDB

readonly #queries: Queries
readonly #rxSubs: Subscription[] = []

constructor(ctx: AgentRuntimeContext) {
this.#log = ctx.log
Expand Down Expand Up @@ -90,7 +115,9 @@ export class DataSteward implements Agent, Queryable {
}

stop() {
//
for (const sub of this.#rxSubs) {
sub.unsubscribe()
}
}

async start() {
Expand All @@ -104,6 +131,29 @@ export class DataSteward implements Agent, Queryable {
}, START_DELAY)
timeout.unref()
}

// TODO generalise for other networks and pallets, similar to mappers but for updates
const chainsToWatch: NetworkURN[] = ['urn:ocn:polkadot:1000', 'urn:ocn:kusama:1000', 'urn:ocn:paseo:1000']
const streams = SharedStreams.instance(this.#ingress)

for (const chainId of chainsToWatch) {
if (this.#ingress.isNetworkDefined(chainId)) {
this.#log.info('[agent:%s] watching for asset updates %s', this.id, chainId)
this.#rxSubs.push(
streams
.blockEvents(chainId)
.pipe(
filter(
(blockEvent) =>
blockEvent.module === 'Assets' && ASSET_PALLET_EVENTS.includes(blockEvent.name),
),
)
.subscribe(async ({ value: { asset_id } }) => {
await this.#updateAsset(chainId, asset_id)
}),
)
}
}
}

collectTelemetry() {
Expand Down Expand Up @@ -145,16 +195,41 @@ export class DataSteward implements Agent, Queryable {
}
}

const allAssetMapsObs = merge(allAssetMaps).pipe(mergeAll())
allAssetMapsObs.subscribe(this.#storeAssetMetadata())
}

async #updateAsset(chainId: NetworkURN, assetId: any) {
const context = await firstValueFrom(this.#ingress.getContext(chainId))
const mappings = mappers[chainId](context)
for (const { mapEntry } of mappings) {
const codec = context.storageCodec('Assets', 'Asset')
const key = codec.enc(assetId) as HexString
this.#ingress
.getStorage(chainId, key)
.pipe(
mapEntry(key, this.#ingress),
map((x) => asSerializable<AssetMetadata>(x)),
)
.pipe(
map((asset) => ({
asset,
chainId,
})),
)
.subscribe(this.#storeAssetMetadata())
}
}

#storeAssetMetadata() {
const tmpMapExternalIds: Record<string, AssetIds[]> = {}
const tmpMapSourceIds: Record<string, string> = {}

const allAssetMapsObs = merge(allAssetMaps).pipe(mergeAll())

allAssetMapsObs.subscribe({
return {
next: async ({ chainId, asset }) => {
const assetKey = assetMetadataKey(chainId, asset.id)

const multilocation = asset.multiLocation

if (multilocation) {
const resolvedId = await this.#queries.resolveAssetIdFromLocation(
chainId,
Expand Down Expand Up @@ -218,7 +293,7 @@ export class DataSteward implements Agent, Queryable {
this.#log.info('[agent:%s] END synchronizing registered chains', this.id)
},
error: (e) => this.#log.error(e, '[agent:%s] on metadata sync', this.id),
})
} as Observer<{ asset: AssetMetadata; chainId: NetworkURN }>
}

#putChainProps(chainId: NetworkURN) {
Expand Down

0 comments on commit ee47698

Please sign in to comment.