Skip to content

Commit

Permalink
fix asset update externalIds
Browse files Browse the repository at this point in the history
  • Loading branch information
XY-Wang committed Nov 26, 2024
1 parent 31322b3 commit 3c04708
Showing 1 changed file with 53 additions and 20 deletions.
73 changes: 53 additions & 20 deletions packages/server/src/services/agents/steward/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const CHAIN_INFO_LEVEL_PREFIX = 'agent:steward:chains'

const STORAGE_PAGE_LEN = 100

const START_DELAY = 30_000 // 5m
const START_DELAY = 30_000 // 30s
const SCHED_RATE = 43_200_000 // 12h

const ASSET_PALLET_EVENTS = [
Expand Down Expand Up @@ -160,7 +160,15 @@ export class DataSteward implements Agent, Queryable {
blockEvent.module === 'Assets' && ASSET_PALLET_EVENTS.includes(blockEvent.name),
),
)
.subscribe(async ({ name, value: { asset_id } }) => {
.subscribe(async ({ name, value: { asset_id }, blockNumber }) => {
this.#log.info(
'[agent:%s] asset update (event=%s, chainId=%s, assetId=%s, block=%s)',
this.id,
name,
chainId,
asset_id,
blockNumber,
)
if (name === 'Destroyed') {
await this.#removeAsset(chainId, asset_id)
} else {
Expand Down Expand Up @@ -216,32 +224,57 @@ export class DataSteward implements Agent, Queryable {
}

async #updateAsset(chainId: NetworkURN, assetId: AssetId) {
const context = await firstValueFrom(this.#ingress.getContext(chainId))
const mappings = mappers[chainId](context)
const { mapEntry } = mappings[0]

const codec = context.storageCodec('Assets', 'Asset')
const key = codec.enc(assetId) as HexString
this.#ingress
.getStorage(chainId, key)
.pipe(
mapEntry(key, this.#ingress),
map((x) => asSerializable<AssetMetadata>(x)),
)
.pipe(
map((asset) => ({
asset,
chainId,
})),
try {
const context = await firstValueFrom(this.#ingress.getContext(chainId))
const mappings = mappers[chainId](context)
const { mapEntry } = mappings[0]

const codec = context.storageCodec('Assets', 'Asset')
const key = codec.enc(assetId) as HexString
const asset = await firstValueFrom(
this.#ingress.getStorage(chainId, key).pipe(
mapEntry(key, this.#ingress),
map((x) => asSerializable<AssetMetadata>(x)),
),
)
.subscribe(this.#storeAssetMetadata())

const assetKey = assetMetadataKey(chainId, asset.id)
const current = await this.#dbAssets.get(assetKey)
await this.#dbAssets.put(assetKey, {
...asset,
externalIds: current.externalIds,
sourceId: current.sourceId,
})
} catch (error) {
this.#log.error(error, '[agent:%s] on update asset (chainId=%s, assetId=%s)', this.id, chainId, assetId)
}
}

async #removeAsset(chainId: NetworkURN, assetId: AssetId) {
try {
const key = assetMetadataKey(chainId, assetId)
await this.#dbAssets.del(key)
this.#log.info('[agent:%s] delete asset (chainId=%s, assetId=%s)', this.id, chainId, assetId)
// remove asset from external ID mappings
for await (const [assetKey, asset] of this.#dbAssets.iterator()) {
const updatedIds = [...asset.externalIds]
const externalIdIndex = updatedIds.findIndex(
(extId) => extId.chainId === chainId && extId.id === assetId,
)
if (externalIdIndex > -1) {
updatedIds.splice(externalIdIndex, 1)
await this.#dbAssets.put(assetKey, {
...asset,
externalIds: updatedIds,
})
this.#log.info(
'[agent:%s] update external IDs (chainId=%s, assetId=%s)',
this.id,
asset.chainId,
asset.id,
)
}
}
} catch (error) {
this.#log.error(
error,
Expand Down

0 comments on commit 3c04708

Please sign in to comment.