Skip to content

Commit

Permalink
fix interlay dmp receive
Browse files Browse the repository at this point in the history
  • Loading branch information
XY-Wang committed Jan 20, 2025
1 parent bf1d453 commit a0d7b0d
Show file tree
Hide file tree
Showing 8 changed files with 379 additions and 308 deletions.
385 changes: 249 additions & 136 deletions packages/server/src/services/agents/xcm/ops/common.spec.ts

Large diffs are not rendered by default.

97 changes: 95 additions & 2 deletions packages/server/src/services/agents/xcm/ops/common.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
import { Observable, map, mergeMap } from 'rxjs'
import { Observable, bufferCount, map, mergeMap } from 'rxjs'

import { filterNonNull } from '@/common/index.js'
import { createNetworkId, getChainId, getConsensus, isOnSameConsensus } from '@/services/config.js'
import { ApiContext, BlockEvent } from '@/services/networking/index.js'
import { HexString } from '@/services/subscriptions/types.js'
import { AnyJson, NetworkURN } from '@/services/types.js'
import { toHex } from 'polkadot-api/utils'
import {
GenericXcmInboundWithContext,
GenericXcmSent,
Leg,
XcmInbound,
XcmInboundWithContext,
XcmSent,
XcmSentWithContext,
} from '../types.js'
import { getParaIdFromJunctions, getSendersFromEvent, networkIdFromMultiLocation } from './util.js'
import {
getParaIdFromJunctions,
getSendersFromEvent,
mapAssetsTrapped,
matchEvent,
networkIdFromMultiLocation,
} from './util.js'
import { raw, versionedXcmCodec } from './xcm-format.js'
import { METHODS_XCMP_QUEUE } from './xcmp.js'

type Stop = { networkId: NetworkURN; message?: any[] }

Expand Down Expand Up @@ -161,3 +170,87 @@ export function xcmMessagesSent() {
)
}
}

/**
* Extract XCM receive events for both DMP and HRMP in parachains.
* Most parachains emit the same event, MessageQueue.Processed, for both DMP and HRMP.
* But some, like Interlay, emits a different event DmpQueue.ExecutedDownward for DMP.
*/
export function extractParachainReceive() {
return (source: Observable<BlockEvent>): Observable<XcmInboundWithContext> => {
return source.pipe(
bufferCount(2, 1),
// eslint-disable-next-line complexity
map(([maybeAssetTrapEvent, maybeXcmpEvent]) => {
if (maybeXcmpEvent === undefined) {
return null
}

const assetTrapEvent = matchEvent(maybeAssetTrapEvent, ['XcmPallet', 'PolkadotXcm'], 'AssetsTrapped')
? maybeAssetTrapEvent
: undefined
const assetsTrapped = mapAssetsTrapped(assetTrapEvent)

if (matchEvent(maybeXcmpEvent, 'XcmpQueue', METHODS_XCMP_QUEUE)) {
const xcmpQueueData = maybeXcmpEvent.value

return new GenericXcmInboundWithContext({
event: maybeXcmpEvent,
extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString,
blockHash: maybeXcmpEvent.blockHash as HexString,
blockNumber: maybeXcmpEvent.blockNumber,
timestamp: maybeXcmpEvent.timestamp,
extrinsicPosition: maybeXcmpEvent.extrinsicPosition,
messageHash: xcmpQueueData.message_hash,
messageId: xcmpQueueData.message_id,
outcome: maybeXcmpEvent.name === 'Success' ? 'Success' : 'Fail',
error: xcmpQueueData.error,
assetsTrapped,
})
} else if (matchEvent(maybeXcmpEvent, 'MessageQueue', 'Processed')) {
const { id, success, error } = maybeXcmpEvent.value
// Received event only emits field `message_id`,
// which is actually the message hash in chains that do not yet support Topic ID.
const messageId = id
const messageHash = messageId

return new GenericXcmInboundWithContext({
event: maybeXcmpEvent,
extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString,
blockHash: maybeXcmpEvent.blockHash as HexString,
blockNumber: maybeXcmpEvent.blockNumber,
timestamp: maybeXcmpEvent.timestamp,
messageHash,
messageId,
outcome: success ? 'Success' : 'Fail',
error,
assetsTrapped,
})
} else if (matchEvent(maybeXcmpEvent, 'DmpQueue', 'ExecutedDownward')) {
const { message_id, outcome } = maybeXcmpEvent.value

// Received event only emits field `message_id`,
// which is actually the message hash in chains that do not yet support Topic ID.
const messageId = message_id
const messageHash = messageId

return new GenericXcmInboundWithContext({
event: maybeXcmpEvent,
extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString,
blockHash: maybeXcmpEvent.blockHash as HexString,
blockNumber: maybeXcmpEvent.blockNumber,
timestamp: maybeXcmpEvent.timestamp,
messageHash,
messageId,
outcome: outcome.type === 'Complete' ? 'Success' : 'Fail',
error: null,
assetsTrapped,
})
}

return null
}),
filterNonNull(),
)
}
}
87 changes: 2 additions & 85 deletions packages/server/src/services/agents/xcm/ops/xcmp.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { extractEvents } from '@/common/index.js'
import { apiContext, xcmpReceive, xcmpSend } from '@/testing/xcm.js'

import { extractXcmpReceive, extractXcmpSend } from './xcmp.js'
import { apiContext, xcmpSend } from '@/testing/xcm.js'
import { extractXcmpSend } from './xcmp.js'

describe('xcmp operator', () => {
describe('extractXcmpSend', () => {
Expand Down Expand Up @@ -33,86 +32,4 @@ describe('xcmp operator', () => {
})
})
})

describe('extractXcmpReceive', () => {
it('should extract XCMP receive with outcome success', async () => {
const { successBlocks } = xcmpReceive
const calls = vi.fn()
const test$ = extractXcmpReceive()(successBlocks.pipe(extractEvents()))

await new Promise<void>((resolve) => {
test$.subscribe({
next: (msg) => {
expect(msg).toBeDefined()
expect(msg.blockNumber).toBeDefined()
expect(msg.blockHash).toBeDefined()
expect(msg.event).toBeDefined()
expect(msg.messageHash).toBeDefined()
expect(msg.timestamp).toBeDefined()
expect(msg.outcome).toBeDefined()
expect(msg.outcome).toBe('Success')
calls()
},
complete: () => {
expect(calls).toHaveBeenCalledTimes(1)
resolve()
},
})
})
})

it('should extract failed XCMP received message with error', async () => {
const { failBlocks } = xcmpReceive
const calls = vi.fn()
const test$ = extractXcmpReceive()(failBlocks.pipe(extractEvents()))

await new Promise<void>((resolve) => {
test$.subscribe({
next: (msg) => {
expect(msg).toBeDefined()
expect(msg.blockNumber).toBeDefined()
expect(msg.blockHash).toBeDefined()
expect(msg.event).toBeDefined()
expect(msg.messageHash).toBeDefined()
expect(msg.outcome).toBeDefined()
expect(msg.outcome).toBe('Fail')
expect(msg.timestamp).toBeDefined()
calls()
},
complete: () => {
expect(calls).toHaveBeenCalledTimes(1)
resolve()
},
})
})
})

it('should extract assets trapped info on XCMP received message for V4 assets', async () => {
const { trappedBlocks } = xcmpReceive
const calls = vi.fn()
const test$ = extractXcmpReceive()(trappedBlocks.pipe(extractEvents()))

await new Promise<void>((resolve) => {
test$.subscribe({
next: (msg) => {
expect(msg).toBeDefined()
expect(msg.blockNumber).toBeDefined()
expect(msg.blockHash).toBeDefined()
expect(msg.event).toBeDefined()
expect(msg.messageHash).toBeDefined()
expect(msg.outcome).toBeDefined()
expect(msg.outcome).toBe('Fail')
expect(msg.assetsTrapped).toBeDefined()
expect(msg.assetsTrapped?.assets).toBeDefined()
expect(msg.timestamp).toBeDefined()
calls()
},
complete: () => {
expect(calls).toHaveBeenCalledTimes(1)
resolve()
},
})
})
})
})
})
74 changes: 4 additions & 70 deletions packages/server/src/services/agents/xcm/ops/xcmp.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
import { Observable, bufferCount, filter, map, mergeMap } from 'rxjs'
import { Observable, filter, map, mergeMap } from 'rxjs'

import { filterNonNull } from '@/common/index.js'
import { HexString } from '@/lib.js'
import { createNetworkId } from '@/services/config.js'
import { ApiContext, BlockEvent } from '@/services/networking/index.js'
import { NetworkURN } from '@/services/types.js'

import {
GenericXcmInboundWithContext,
GenericXcmSentWithContext,
GetOutboundHrmpMessages,
XcmInboundWithContext,
XcmSentWithContext,
} from '../types.js'
import { GenericXcmSentWithContext, GetOutboundHrmpMessages, XcmSentWithContext } from '../types.js'
import { xcmMessagesSent } from './common.js'
import { getMessageId, mapAssetsTrapped, matchEvent } from './util.js'
import { getMessageId, matchEvent } from './util.js'
import { fromXcmpFormat } from './xcm-format.js'

const METHODS_XCMP_QUEUE = ['Success', 'Fail']
export const METHODS_XCMP_QUEUE = ['Success', 'Fail']

function findOutboundHrmpMessage(
origin: NetworkURN,
Expand Down Expand Up @@ -78,62 +71,3 @@ export function extractXcmpSend(
)
}
}

export function extractXcmpReceive() {
return (source: Observable<BlockEvent>): Observable<XcmInboundWithContext> => {
return source.pipe(
bufferCount(2, 1),
// eslint-disable-next-line complexity
map(([maybeAssetTrapEvent, maybeXcmpEvent]) => {
if (maybeXcmpEvent === undefined) {
return null
}

const assetTrapEvent = matchEvent(maybeAssetTrapEvent, ['XcmPallet', 'PolkadotXcm'], 'AssetsTrapped')
? maybeAssetTrapEvent
: undefined
const assetsTrapped = mapAssetsTrapped(assetTrapEvent)

if (matchEvent(maybeXcmpEvent, 'XcmpQueue', METHODS_XCMP_QUEUE)) {
const xcmpQueueData = maybeXcmpEvent.value

return new GenericXcmInboundWithContext({
event: maybeXcmpEvent,
extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString,
blockHash: maybeXcmpEvent.blockHash as HexString,
blockNumber: maybeXcmpEvent.blockNumber,
timestamp: maybeXcmpEvent.timestamp,
extrinsicPosition: maybeXcmpEvent.extrinsicPosition,
messageHash: xcmpQueueData.message_hash,
messageId: xcmpQueueData.message_id,
outcome: maybeXcmpEvent.name === 'Success' ? 'Success' : 'Fail',
error: xcmpQueueData.error,
assetsTrapped,
})
} else if (matchEvent(maybeXcmpEvent, 'MessageQueue', 'Processed')) {
const { id, success, error } = maybeXcmpEvent.value
// Received event only emits field `message_id`,
// which is actually the message hash in chains that do not yet support Topic ID.
const messageId = id
const messageHash = messageId

return new GenericXcmInboundWithContext({
event: maybeXcmpEvent,
extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString,
blockHash: maybeXcmpEvent.blockHash as HexString,
blockNumber: maybeXcmpEvent.blockNumber,
timestamp: maybeXcmpEvent.timestamp,
messageHash,
messageId,
outcome: success ? 'Success' : 'Fail',
error,
assetsTrapped,
})
}

return null
}),
filterNonNull(),
)
}
}
22 changes: 22 additions & 0 deletions packages/server/src/services/agents/xcm/tracking.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,26 @@ describe('extractXcmMessageData', () => {
})
})
})

it('should extract xcm messages from dmp with topic id that does not accept topic id', async () => {
const blocks = from(testBlocksFrom('interlay/7025155.cbor'))
const calls = vi.fn()
const test$ = extractXcmMessageData(apiContext)(blocks.pipe())

new Promise<void>((resolve) => {
test$.subscribe({
next: ({ hashData }) => {
console.log(hashData)
calls()
expect(hashData).toBeDefined()
expect(hashData[0].hash).toBeDefined()
expect(hashData[0].data).toBeDefined()
},
complete: () => {
resolve()
expect(calls).toHaveBeenCalledTimes(1)
},
})
})
})
})
21 changes: 6 additions & 15 deletions packages/server/src/services/agents/xcm/tracking.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import { SharedStreams } from '../base/shared.js'
import { AgentRuntimeContext } from '../types.js'
import { MatchingEngine } from './matching.js'
import { mapXcmInbound, mapXcmSent } from './ops/common.js'
import { extractParachainReceive } from './ops/common.js'
import { messageCriteria } from './ops/criteria.js'
import { extractDmpSendByEvent } from './ops/dmp.js'
import { extractRelayReceive } from './ops/relay.js'
import { extractUmpReceive, extractUmpSend } from './ops/ump.js'
import { getMessageId, matchExtrinsic } from './ops/util.js'
import { fromXcmpFormat, raw } from './ops/xcm-format.js'
import { extractXcmpReceive, extractXcmpSend } from './ops/xcmp.js'
import { extractXcmpSend } from './ops/xcmp.js'
import { TelemetryXcmEventEmitter } from './telemetry/events.js'
import { xcmAgentMetrics, xcmMatchingEngineMetrics } from './telemetry/metrics.js'
import {
Expand Down Expand Up @@ -171,8 +172,8 @@ export class XcmTracker {
.subscribe(inboundObserver),
})
} else {
// VMP DMP
this.#log.info('[%s] %s subscribe inbound DMP', this.#id, chainId)
// VMP + HRMP
this.#log.info('[%s] %s subscribe inbound DMP + HRMP / XCMP', this.#id, chainId)

const messageHashBlocks$ = this.#ingress.getContext(chainId).pipe(
switchMap((context) =>
Expand All @@ -188,21 +189,11 @@ export class XcmTracker {
),
)

// DMP and HRMP receive matches the same event
// subs.push({
// chainId,
// sub: messageHashBlocks$
// .pipe(extractEvents(), extractDmpReceive(), mapXcmInbound(chainId))
// .subscribe(inboundObserver),
// })

// Inbound HRMP / XCMP transport
this.#log.info('[%s] %s subscribe inbound HRMP', this.#id, chainId)

// Extract both DMP and HRMP receive
subs.push({
chainId,
sub: messageHashBlocks$
.pipe(extractEvents(), extractXcmpReceive(), mapXcmInbound(chainId))
.pipe(extractEvents(), extractParachainReceive(), mapXcmInbound(chainId))
.subscribe(inboundObserver),
})
}
Expand Down
Binary file not shown.
1 change: 1 addition & 0 deletions packages/server/src/testing/tools/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const networks = {
moonbeam: 'wss://moonbeam.ibp.network',
astar: 'wss://rpc.astar.network',
bifrost: 'wss://bifrost-polkadot.ibp.network',
interlay: 'wss://interlay-rpc.dwellir.com',
} as Record<string, string>

async function download([name, ws, height]: [string, string, string]) {
Expand Down

0 comments on commit a0d7b0d

Please sign in to comment.