diff --git a/packages/server/src/services/agents/xcm/ops/common.spec.ts b/packages/server/src/services/agents/xcm/ops/common.spec.ts index 7c0a0f2e..ad4540b1 100644 --- a/packages/server/src/services/agents/xcm/ops/common.spec.ts +++ b/packages/server/src/services/agents/xcm/ops/common.spec.ts @@ -1,117 +1,240 @@ import { from, of } from 'rxjs' -import { apiContext } from '@/testing/xcm.js' +import { extractEvents } from '@/common/index.js' +import { testBlocksFrom } from '@/testing/blocks.js' +import { apiContext, xcmpReceive } from '@/testing/xcm.js' import { GenericXcmSentWithContext } from '../types.js' -import { mapXcmSent } from './common.js' +import { extractParachainReceive, mapXcmSent } from './common.js' import { getMessageId } from './util.js' import { asVersionedXcm, fromXcmpFormat } from './xcm-format.js' -describe('extract waypoints operator', () => { - describe('mapXcmSent', () => { - it('should extract stops for a V2 XCM message without hops', async () => { - const calls = vi.fn() - const moon5531424 = - '0002100004000000001700004b3471bb156b050a13000000001700004b3471bb156b05010300286bee0d010004000101001e08eb75720cb63fbfcbe7237c6d9b7cf6b4953518da6b38731d5bc65b9ffa32021000040000000017206d278c7e297945030a130000000017206d278c7e29794503010300286bee0d010004000101000257fd81d0a71b094c2c8d3e6c93a9b01a31a43d38408bb2c4c2b49a4c58eb01' - const buf = new Uint8Array(Buffer.from(moon5531424, 'hex')) - const xcms = fromXcmpFormat(buf, apiContext) - const test$ = mapXcmSent( - apiContext, - 'urn:ocn:local:2004', - )( - from( - xcms.map( - (x) => - new GenericXcmSentWithContext({ - event: {}, - sender: { signer: { id: 'xyz', publicKey: '0x01' }, extraSigners: [] }, - blockHash: '0x01', - blockNumber: '32', - extrinsicPosition: 4, - recipient: 'urn:ocn:local:2104', - messageDataBuffer: buf, - messageHash: x.hash, - messageId: getMessageId(x), - instructions: { - bytes: x.data, - json: x.instructions, - }, - }), +describe('common xcm operators', () => { + describe('extract waypoints operator', () => { + describe('mapXcmSent', () => { + it('should extract stops for a V2 XCM message without hops', async () => { + const calls = vi.fn() + const moon5531424 = + '0002100004000000001700004b3471bb156b050a13000000001700004b3471bb156b05010300286bee0d010004000101001e08eb75720cb63fbfcbe7237c6d9b7cf6b4953518da6b38731d5bc65b9ffa32021000040000000017206d278c7e297945030a130000000017206d278c7e29794503010300286bee0d010004000101000257fd81d0a71b094c2c8d3e6c93a9b01a31a43d38408bb2c4c2b49a4c58eb01' + const buf = new Uint8Array(Buffer.from(moon5531424, 'hex')) + const xcms = fromXcmpFormat(buf, apiContext) + const test$ = mapXcmSent( + apiContext, + 'urn:ocn:local:2004', + )( + from( + xcms.map( + (x) => + new GenericXcmSentWithContext({ + event: {}, + sender: { signer: { id: 'xyz', publicKey: '0x01' }, extraSigners: [] }, + blockHash: '0x01', + blockNumber: '32', + extrinsicPosition: 4, + recipient: 'urn:ocn:local:2104', + messageDataBuffer: buf, + messageHash: x.hash, + messageId: getMessageId(x), + instructions: { + bytes: x.data, + json: x.instructions, + }, + }), + ), + ), + ) + + await new Promise((resolve) => { + test$.subscribe({ + next: (msg) => { + expect(msg).toBeDefined() + expect(msg.waypoint.chainId).toBe('urn:ocn:local:2004') + expect(msg.legs.length).toBe(1) + expect(msg.legs[0]).toEqual({ + from: 'urn:ocn:local:2004', + to: 'urn:ocn:local:2104', + relay: 'urn:ocn:local:0', + type: 'hrmp', + }) + expect(msg.destination.chainId).toBe('urn:ocn:local:2104') + calls() + }, + complete: () => { + expect(calls).toHaveBeenCalledTimes(2) + resolve() + }, + }) + }) + }) + + it('should extract stops for a XCM message hopping with InitiateReserveWithdraw', async () => { + const calls = vi.fn() + const polka19505060 = + '0310000400010300a10f043205011f000700f2052a011300010300a10f043205011f000700f2052a010010010204010100a10f0813000002043205011f0002093d00000d0102040001010081bd2c1d40052682633fb3e67eff151b535284d1d1a9633613af14006656f42b2c8e75728b841da22d8337ff5fadd1264f13addcdee755b01ce1a3afb9ef629b9a' + const buf = new Uint8Array(Buffer.from(polka19505060, 'hex')) + const xcm = asVersionedXcm(buf, apiContext) + const test$ = mapXcmSent( + apiContext, + 'urn:ocn:local:0', + )( + of( + new GenericXcmSentWithContext({ + event: {}, + sender: { signer: { id: 'xyz', publicKey: '0x01' }, extraSigners: [] }, + blockHash: '0x01', + blockNumber: '32', + extrinsicPosition: 4, + recipient: 'urn:ocn:local:2034', + messageDataBuffer: buf, + messageHash: xcm.hash, + messageId: getMessageId(xcm), + instructions: { + bytes: xcm.data, + json: xcm.instructions, + }, + }), ), - ), - ) + ) + + await new Promise((resolve) => { + test$.subscribe({ + next: (msg) => { + expect(msg).toBeDefined() + expect(msg.waypoint.chainId).toBe('urn:ocn:local:0') + expect(msg.legs.length).toBe(2) + expect(msg.legs[0]).toEqual({ + from: 'urn:ocn:local:0', + to: 'urn:ocn:local:2034', + type: 'hop', + }) + expect(msg.legs[1]).toEqual({ + from: 'urn:ocn:local:2034', + to: 'urn:ocn:local:1000', + relay: 'urn:ocn:local:0', + partialMessage: + '0x030813000002043205011f0002093d00000d0102040001010081bd2c1d40052682633fb3e67eff151b535284d1d1a9633613af14006656f42b', + type: 'hrmp', + }) + expect(msg.destination.chainId).toBe('urn:ocn:local:1000') + calls() + }, + complete: () => { + expect(calls).toHaveBeenCalledTimes(1) + resolve() + }, + }) + }) + }) + + it('should extract stops for a XCM message hopping with DepositReserveAsset', async () => { + const calls = vi.fn() + const heiko5389341 = + '0003100004000000000f251850c822be030a13000000000f120c286411df01000e010204010100411f081300010100511f000f120c286411df01000d01020400010100842745b99b8042d28a7c677d9469332bfc24aa5266c7ec57c43c7af125a0c16c' + const buf = new Uint8Array(Buffer.from(heiko5389341, 'hex')) + const xcms = fromXcmpFormat(buf, apiContext) + const test$ = mapXcmSent( + apiContext, + 'urn:ocn:local:2085', + )( + from( + xcms.map( + (x) => + new GenericXcmSentWithContext({ + event: {}, + sender: { signer: { id: 'xyz', publicKey: '0x01' }, extraSigners: [] }, + blockHash: '0x01', + blockNumber: '32', + extrinsicPosition: 4, + recipient: 'urn:ocn:local:2004', + messageDataBuffer: buf, + messageHash: x.hash, + messageId: getMessageId(x), + instructions: { + bytes: x.data, + json: x.instructions, + }, + }), + ), + ), + ) + + await new Promise((resolve) => { + test$.subscribe({ + next: (msg) => { + expect(msg).toBeDefined() + expect(msg.waypoint.chainId).toBe('urn:ocn:local:2085') + + expect(msg.legs.length).toBe(2) + expect(msg.legs[0]).toEqual({ + from: 'urn:ocn:local:2085', + to: 'urn:ocn:local:2004', + relay: 'urn:ocn:local:0', + type: 'hop', + }) + expect(msg.legs[1]).toEqual({ + from: 'urn:ocn:local:2004', + to: 'urn:ocn:local:2000', + relay: 'urn:ocn:local:0', + partialMessage: + '0x03081300010100511f000f120c286411df01000d01020400010100842745b99b8042d28a7c677d9469332bfc24aa5266c7ec57c43c7af125a0c16c', + type: 'hrmp', + }) + + expect(msg.destination.chainId).toBe('urn:ocn:local:2000') + calls() + }, + complete: () => { + expect(calls).toHaveBeenCalledTimes(1) + resolve() + }, + }) + }) + }) + }) + }) + + describe('extractParachainReceive', () => { + it('should extract XCMP receive with outcome success', async () => { + const { successBlocks } = xcmpReceive + const calls = vi.fn() + const test$ = extractParachainReceive()(successBlocks.pipe(extractEvents())) await new Promise((resolve) => { test$.subscribe({ next: (msg) => { expect(msg).toBeDefined() - expect(msg.waypoint.chainId).toBe('urn:ocn:local:2004') - expect(msg.legs.length).toBe(1) - expect(msg.legs[0]).toEqual({ - from: 'urn:ocn:local:2004', - to: 'urn:ocn:local:2104', - relay: 'urn:ocn:local:0', - type: 'hrmp', - }) - expect(msg.destination.chainId).toBe('urn:ocn:local:2104') + expect(msg.blockNumber).toBeDefined() + expect(msg.blockHash).toBeDefined() + expect(msg.event).toBeDefined() + expect(msg.messageHash).toBeDefined() + expect(msg.timestamp).toBeDefined() + expect(msg.outcome).toBeDefined() + expect(msg.outcome).toBe('Success') calls() }, complete: () => { - expect(calls).toHaveBeenCalledTimes(2) + expect(calls).toHaveBeenCalledTimes(1) resolve() }, }) }) }) - it('should extract stops for a XCM message hopping with InitiateReserveWithdraw', async () => { + it('should extract failed XCMP received message with error', async () => { + const { failBlocks } = xcmpReceive const calls = vi.fn() - const polka19505060 = - '0310000400010300a10f043205011f000700f2052a011300010300a10f043205011f000700f2052a010010010204010100a10f0813000002043205011f0002093d00000d0102040001010081bd2c1d40052682633fb3e67eff151b535284d1d1a9633613af14006656f42b2c8e75728b841da22d8337ff5fadd1264f13addcdee755b01ce1a3afb9ef629b9a' - const buf = new Uint8Array(Buffer.from(polka19505060, 'hex')) - const xcm = asVersionedXcm(buf, apiContext) - const test$ = mapXcmSent( - apiContext, - 'urn:ocn:local:0', - )( - of( - new GenericXcmSentWithContext({ - event: {}, - sender: { signer: { id: 'xyz', publicKey: '0x01' }, extraSigners: [] }, - blockHash: '0x01', - blockNumber: '32', - extrinsicPosition: 4, - recipient: 'urn:ocn:local:2034', - messageDataBuffer: buf, - messageHash: xcm.hash, - messageId: getMessageId(xcm), - instructions: { - bytes: xcm.data, - json: xcm.instructions, - }, - }), - ), - ) + const test$ = extractParachainReceive()(failBlocks.pipe(extractEvents())) await new Promise((resolve) => { test$.subscribe({ next: (msg) => { expect(msg).toBeDefined() - expect(msg.waypoint.chainId).toBe('urn:ocn:local:0') - expect(msg.legs.length).toBe(2) - expect(msg.legs[0]).toEqual({ - from: 'urn:ocn:local:0', - to: 'urn:ocn:local:2034', - type: 'hop', - }) - expect(msg.legs[1]).toEqual({ - from: 'urn:ocn:local:2034', - to: 'urn:ocn:local:1000', - relay: 'urn:ocn:local:0', - partialMessage: - '0x030813000002043205011f0002093d00000d0102040001010081bd2c1d40052682633fb3e67eff151b535284d1d1a9633613af14006656f42b', - type: 'hrmp', - }) - expect(msg.destination.chainId).toBe('urn:ocn:local:1000') + expect(msg.blockNumber).toBeDefined() + expect(msg.blockHash).toBeDefined() + expect(msg.event).toBeDefined() + expect(msg.messageHash).toBeDefined() + expect(msg.outcome).toBeDefined() + expect(msg.outcome).toBe('Fail') + expect(msg.timestamp).toBeDefined() calls() }, complete: () => { @@ -122,61 +245,24 @@ describe('extract waypoints operator', () => { }) }) - it('should extract stops for a XCM message hopping with DepositReserveAsset', async () => { + it('should extract assets trapped info on XCMP received message for V4 assets', async () => { + const { trappedBlocks } = xcmpReceive const calls = vi.fn() - const heiko5389341 = - '0003100004000000000f251850c822be030a13000000000f120c286411df01000e010204010100411f081300010100511f000f120c286411df01000d01020400010100842745b99b8042d28a7c677d9469332bfc24aa5266c7ec57c43c7af125a0c16c' - const buf = new Uint8Array(Buffer.from(heiko5389341, 'hex')) - const xcms = fromXcmpFormat(buf, apiContext) - const test$ = mapXcmSent( - apiContext, - 'urn:ocn:local:2085', - )( - from( - xcms.map( - (x) => - new GenericXcmSentWithContext({ - event: {}, - sender: { signer: { id: 'xyz', publicKey: '0x01' }, extraSigners: [] }, - blockHash: '0x01', - blockNumber: '32', - extrinsicPosition: 4, - recipient: 'urn:ocn:local:2004', - messageDataBuffer: buf, - messageHash: x.hash, - messageId: getMessageId(x), - instructions: { - bytes: x.data, - json: x.instructions, - }, - }), - ), - ), - ) + const test$ = extractParachainReceive()(trappedBlocks.pipe(extractEvents())) await new Promise((resolve) => { test$.subscribe({ next: (msg) => { expect(msg).toBeDefined() - expect(msg.waypoint.chainId).toBe('urn:ocn:local:2085') - - expect(msg.legs.length).toBe(2) - expect(msg.legs[0]).toEqual({ - from: 'urn:ocn:local:2085', - to: 'urn:ocn:local:2004', - relay: 'urn:ocn:local:0', - type: 'hop', - }) - expect(msg.legs[1]).toEqual({ - from: 'urn:ocn:local:2004', - to: 'urn:ocn:local:2000', - relay: 'urn:ocn:local:0', - partialMessage: - '0x03081300010100511f000f120c286411df01000d01020400010100842745b99b8042d28a7c677d9469332bfc24aa5266c7ec57c43c7af125a0c16c', - type: 'hrmp', - }) - - expect(msg.destination.chainId).toBe('urn:ocn:local:2000') + expect(msg.blockNumber).toBeDefined() + expect(msg.blockHash).toBeDefined() + expect(msg.event).toBeDefined() + expect(msg.messageHash).toBeDefined() + expect(msg.outcome).toBeDefined() + expect(msg.outcome).toBe('Fail') + expect(msg.assetsTrapped).toBeDefined() + expect(msg.assetsTrapped?.assets).toBeDefined() + expect(msg.timestamp).toBeDefined() calls() }, complete: () => { @@ -186,5 +272,32 @@ describe('extract waypoints operator', () => { }) }) }) + + it('should extract dmpQueue.ExecutedDownward events', async () => { + const blocks = from(testBlocksFrom('interlay/7025155.cbor')) + const calls = vi.fn() + const test$ = extractParachainReceive()(blocks.pipe(extractEvents())) + + await new Promise((resolve) => { + test$.subscribe({ + next: (msg) => { + calls() + expect(msg).toBeDefined() + expect(msg.blockNumber).toBeDefined() + expect(msg.blockHash).toBeDefined() + expect(msg.event).toBeDefined() + expect(msg.messageHash).toBeDefined() + expect(msg.outcome).toBeDefined() + expect(msg.outcome).toBe('Success') + expect(msg.assetsTrapped).toBeUndefined() + expect(msg.timestamp).toBeDefined() + }, + complete: () => { + resolve() + expect(calls).toHaveBeenCalledTimes(1) + }, + }) + }) + }) }) }) diff --git a/packages/server/src/services/agents/xcm/ops/common.ts b/packages/server/src/services/agents/xcm/ops/common.ts index d7c2c414..c1e88455 100644 --- a/packages/server/src/services/agents/xcm/ops/common.ts +++ b/packages/server/src/services/agents/xcm/ops/common.ts @@ -1,11 +1,13 @@ -import { Observable, map, mergeMap } from 'rxjs' +import { Observable, bufferCount, map, mergeMap } from 'rxjs' +import { filterNonNull } from '@/common/index.js' import { createNetworkId, getChainId, getConsensus, isOnSameConsensus } from '@/services/config.js' import { ApiContext, BlockEvent } from '@/services/networking/index.js' import { HexString } from '@/services/subscriptions/types.js' import { AnyJson, NetworkURN } from '@/services/types.js' import { toHex } from 'polkadot-api/utils' import { + GenericXcmInboundWithContext, GenericXcmSent, Leg, XcmInbound, @@ -13,8 +15,15 @@ import { XcmSent, XcmSentWithContext, } from '../types.js' -import { getParaIdFromJunctions, getSendersFromEvent, networkIdFromMultiLocation } from './util.js' +import { + getParaIdFromJunctions, + getSendersFromEvent, + mapAssetsTrapped, + matchEvent, + networkIdFromMultiLocation, +} from './util.js' import { raw, versionedXcmCodec } from './xcm-format.js' +import { METHODS_XCMP_QUEUE } from './xcmp.js' type Stop = { networkId: NetworkURN; message?: any[] } @@ -161,3 +170,87 @@ export function xcmMessagesSent() { ) } } + +/** + * Extract XCM receive events for both DMP and HRMP in parachains. + * Most parachains emit the same event, MessageQueue.Processed, for both DMP and HRMP. + * But some, like Interlay, emits a different event DmpQueue.ExecutedDownward for DMP. + */ +export function extractParachainReceive() { + return (source: Observable): Observable => { + return source.pipe( + bufferCount(2, 1), + // eslint-disable-next-line complexity + map(([maybeAssetTrapEvent, maybeXcmpEvent]) => { + if (maybeXcmpEvent === undefined) { + return null + } + + const assetTrapEvent = matchEvent(maybeAssetTrapEvent, ['XcmPallet', 'PolkadotXcm'], 'AssetsTrapped') + ? maybeAssetTrapEvent + : undefined + const assetsTrapped = mapAssetsTrapped(assetTrapEvent) + + if (matchEvent(maybeXcmpEvent, 'XcmpQueue', METHODS_XCMP_QUEUE)) { + const xcmpQueueData = maybeXcmpEvent.value + + return new GenericXcmInboundWithContext({ + event: maybeXcmpEvent, + extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString, + blockHash: maybeXcmpEvent.blockHash as HexString, + blockNumber: maybeXcmpEvent.blockNumber, + timestamp: maybeXcmpEvent.timestamp, + extrinsicPosition: maybeXcmpEvent.extrinsicPosition, + messageHash: xcmpQueueData.message_hash, + messageId: xcmpQueueData.message_id, + outcome: maybeXcmpEvent.name === 'Success' ? 'Success' : 'Fail', + error: xcmpQueueData.error, + assetsTrapped, + }) + } else if (matchEvent(maybeXcmpEvent, 'MessageQueue', 'Processed')) { + const { id, success, error } = maybeXcmpEvent.value + // Received event only emits field `message_id`, + // which is actually the message hash in chains that do not yet support Topic ID. + const messageId = id + const messageHash = messageId + + return new GenericXcmInboundWithContext({ + event: maybeXcmpEvent, + extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString, + blockHash: maybeXcmpEvent.blockHash as HexString, + blockNumber: maybeXcmpEvent.blockNumber, + timestamp: maybeXcmpEvent.timestamp, + messageHash, + messageId, + outcome: success ? 'Success' : 'Fail', + error, + assetsTrapped, + }) + } else if (matchEvent(maybeXcmpEvent, 'DmpQueue', 'ExecutedDownward')) { + const { message_id, outcome } = maybeXcmpEvent.value + + // Received event only emits field `message_id`, + // which is actually the message hash in chains that do not yet support Topic ID. + const messageId = message_id + const messageHash = messageId + + return new GenericXcmInboundWithContext({ + event: maybeXcmpEvent, + extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString, + blockHash: maybeXcmpEvent.blockHash as HexString, + blockNumber: maybeXcmpEvent.blockNumber, + timestamp: maybeXcmpEvent.timestamp, + messageHash, + messageId, + outcome: outcome.type === 'Complete' ? 'Success' : 'Fail', + error: null, + assetsTrapped, + }) + } + + return null + }), + filterNonNull(), + ) + } +} diff --git a/packages/server/src/services/agents/xcm/ops/xcmp.spec.ts b/packages/server/src/services/agents/xcm/ops/xcmp.spec.ts index ba7ee9d4..78c6c512 100644 --- a/packages/server/src/services/agents/xcm/ops/xcmp.spec.ts +++ b/packages/server/src/services/agents/xcm/ops/xcmp.spec.ts @@ -1,7 +1,6 @@ import { extractEvents } from '@/common/index.js' -import { apiContext, xcmpReceive, xcmpSend } from '@/testing/xcm.js' - -import { extractXcmpReceive, extractXcmpSend } from './xcmp.js' +import { apiContext, xcmpSend } from '@/testing/xcm.js' +import { extractXcmpSend } from './xcmp.js' describe('xcmp operator', () => { describe('extractXcmpSend', () => { @@ -33,86 +32,4 @@ describe('xcmp operator', () => { }) }) }) - - describe('extractXcmpReceive', () => { - it('should extract XCMP receive with outcome success', async () => { - const { successBlocks } = xcmpReceive - const calls = vi.fn() - const test$ = extractXcmpReceive()(successBlocks.pipe(extractEvents())) - - await new Promise((resolve) => { - test$.subscribe({ - next: (msg) => { - expect(msg).toBeDefined() - expect(msg.blockNumber).toBeDefined() - expect(msg.blockHash).toBeDefined() - expect(msg.event).toBeDefined() - expect(msg.messageHash).toBeDefined() - expect(msg.timestamp).toBeDefined() - expect(msg.outcome).toBeDefined() - expect(msg.outcome).toBe('Success') - calls() - }, - complete: () => { - expect(calls).toHaveBeenCalledTimes(1) - resolve() - }, - }) - }) - }) - - it('should extract failed XCMP received message with error', async () => { - const { failBlocks } = xcmpReceive - const calls = vi.fn() - const test$ = extractXcmpReceive()(failBlocks.pipe(extractEvents())) - - await new Promise((resolve) => { - test$.subscribe({ - next: (msg) => { - expect(msg).toBeDefined() - expect(msg.blockNumber).toBeDefined() - expect(msg.blockHash).toBeDefined() - expect(msg.event).toBeDefined() - expect(msg.messageHash).toBeDefined() - expect(msg.outcome).toBeDefined() - expect(msg.outcome).toBe('Fail') - expect(msg.timestamp).toBeDefined() - calls() - }, - complete: () => { - expect(calls).toHaveBeenCalledTimes(1) - resolve() - }, - }) - }) - }) - - it('should extract assets trapped info on XCMP received message for V4 assets', async () => { - const { trappedBlocks } = xcmpReceive - const calls = vi.fn() - const test$ = extractXcmpReceive()(trappedBlocks.pipe(extractEvents())) - - await new Promise((resolve) => { - test$.subscribe({ - next: (msg) => { - expect(msg).toBeDefined() - expect(msg.blockNumber).toBeDefined() - expect(msg.blockHash).toBeDefined() - expect(msg.event).toBeDefined() - expect(msg.messageHash).toBeDefined() - expect(msg.outcome).toBeDefined() - expect(msg.outcome).toBe('Fail') - expect(msg.assetsTrapped).toBeDefined() - expect(msg.assetsTrapped?.assets).toBeDefined() - expect(msg.timestamp).toBeDefined() - calls() - }, - complete: () => { - expect(calls).toHaveBeenCalledTimes(1) - resolve() - }, - }) - }) - }) - }) }) diff --git a/packages/server/src/services/agents/xcm/ops/xcmp.ts b/packages/server/src/services/agents/xcm/ops/xcmp.ts index c75b1d03..877960d5 100644 --- a/packages/server/src/services/agents/xcm/ops/xcmp.ts +++ b/packages/server/src/services/agents/xcm/ops/xcmp.ts @@ -1,23 +1,16 @@ -import { Observable, bufferCount, filter, map, mergeMap } from 'rxjs' +import { Observable, filter, map, mergeMap } from 'rxjs' import { filterNonNull } from '@/common/index.js' -import { HexString } from '@/lib.js' import { createNetworkId } from '@/services/config.js' import { ApiContext, BlockEvent } from '@/services/networking/index.js' import { NetworkURN } from '@/services/types.js' -import { - GenericXcmInboundWithContext, - GenericXcmSentWithContext, - GetOutboundHrmpMessages, - XcmInboundWithContext, - XcmSentWithContext, -} from '../types.js' +import { GenericXcmSentWithContext, GetOutboundHrmpMessages, XcmSentWithContext } from '../types.js' import { xcmMessagesSent } from './common.js' -import { getMessageId, mapAssetsTrapped, matchEvent } from './util.js' +import { getMessageId, matchEvent } from './util.js' import { fromXcmpFormat } from './xcm-format.js' -const METHODS_XCMP_QUEUE = ['Success', 'Fail'] +export const METHODS_XCMP_QUEUE = ['Success', 'Fail'] function findOutboundHrmpMessage( origin: NetworkURN, @@ -78,62 +71,3 @@ export function extractXcmpSend( ) } } - -export function extractXcmpReceive() { - return (source: Observable): Observable => { - return source.pipe( - bufferCount(2, 1), - // eslint-disable-next-line complexity - map(([maybeAssetTrapEvent, maybeXcmpEvent]) => { - if (maybeXcmpEvent === undefined) { - return null - } - - const assetTrapEvent = matchEvent(maybeAssetTrapEvent, ['XcmPallet', 'PolkadotXcm'], 'AssetsTrapped') - ? maybeAssetTrapEvent - : undefined - const assetsTrapped = mapAssetsTrapped(assetTrapEvent) - - if (matchEvent(maybeXcmpEvent, 'XcmpQueue', METHODS_XCMP_QUEUE)) { - const xcmpQueueData = maybeXcmpEvent.value - - return new GenericXcmInboundWithContext({ - event: maybeXcmpEvent, - extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString, - blockHash: maybeXcmpEvent.blockHash as HexString, - blockNumber: maybeXcmpEvent.blockNumber, - timestamp: maybeXcmpEvent.timestamp, - extrinsicPosition: maybeXcmpEvent.extrinsicPosition, - messageHash: xcmpQueueData.message_hash, - messageId: xcmpQueueData.message_id, - outcome: maybeXcmpEvent.name === 'Success' ? 'Success' : 'Fail', - error: xcmpQueueData.error, - assetsTrapped, - }) - } else if (matchEvent(maybeXcmpEvent, 'MessageQueue', 'Processed')) { - const { id, success, error } = maybeXcmpEvent.value - // Received event only emits field `message_id`, - // which is actually the message hash in chains that do not yet support Topic ID. - const messageId = id - const messageHash = messageId - - return new GenericXcmInboundWithContext({ - event: maybeXcmpEvent, - extrinsicHash: maybeXcmpEvent.extrinsic?.hash as HexString, - blockHash: maybeXcmpEvent.blockHash as HexString, - blockNumber: maybeXcmpEvent.blockNumber, - timestamp: maybeXcmpEvent.timestamp, - messageHash, - messageId, - outcome: success ? 'Success' : 'Fail', - error, - assetsTrapped, - }) - } - - return null - }), - filterNonNull(), - ) - } -} diff --git a/packages/server/src/services/agents/xcm/tracking.spec.ts b/packages/server/src/services/agents/xcm/tracking.spec.ts index 583c4fdf..81a875b2 100644 --- a/packages/server/src/services/agents/xcm/tracking.spec.ts +++ b/packages/server/src/services/agents/xcm/tracking.spec.ts @@ -68,4 +68,26 @@ describe('extractXcmMessageData', () => { }) }) }) + + it('should extract xcm messages from dmp with topic id that does not accept topic id', async () => { + const blocks = from(testBlocksFrom('interlay/7025155.cbor')) + const calls = vi.fn() + const test$ = extractXcmMessageData(apiContext)(blocks.pipe()) + + new Promise((resolve) => { + test$.subscribe({ + next: ({ hashData }) => { + console.log(hashData) + calls() + expect(hashData).toBeDefined() + expect(hashData[0].hash).toBeDefined() + expect(hashData[0].data).toBeDefined() + }, + complete: () => { + resolve() + expect(calls).toHaveBeenCalledTimes(1) + }, + }) + }) + }) }) diff --git a/packages/server/src/services/agents/xcm/tracking.ts b/packages/server/src/services/agents/xcm/tracking.ts index 01f2c829..0ea87062 100644 --- a/packages/server/src/services/agents/xcm/tracking.ts +++ b/packages/server/src/services/agents/xcm/tracking.ts @@ -13,13 +13,14 @@ import { SharedStreams } from '../base/shared.js' import { AgentRuntimeContext } from '../types.js' import { MatchingEngine } from './matching.js' import { mapXcmInbound, mapXcmSent } from './ops/common.js' +import { extractParachainReceive } from './ops/common.js' import { messageCriteria } from './ops/criteria.js' import { extractDmpSendByEvent } from './ops/dmp.js' import { extractRelayReceive } from './ops/relay.js' import { extractUmpReceive, extractUmpSend } from './ops/ump.js' import { getMessageId, matchExtrinsic } from './ops/util.js' import { fromXcmpFormat, raw } from './ops/xcm-format.js' -import { extractXcmpReceive, extractXcmpSend } from './ops/xcmp.js' +import { extractXcmpSend } from './ops/xcmp.js' import { TelemetryXcmEventEmitter } from './telemetry/events.js' import { xcmAgentMetrics, xcmMatchingEngineMetrics } from './telemetry/metrics.js' import { @@ -171,8 +172,8 @@ export class XcmTracker { .subscribe(inboundObserver), }) } else { - // VMP DMP - this.#log.info('[%s] %s subscribe inbound DMP', this.#id, chainId) + // VMP + HRMP + this.#log.info('[%s] %s subscribe inbound DMP + HRMP / XCMP', this.#id, chainId) const messageHashBlocks$ = this.#ingress.getContext(chainId).pipe( switchMap((context) => @@ -188,21 +189,11 @@ export class XcmTracker { ), ) - // DMP and HRMP receive matches the same event - // subs.push({ - // chainId, - // sub: messageHashBlocks$ - // .pipe(extractEvents(), extractDmpReceive(), mapXcmInbound(chainId)) - // .subscribe(inboundObserver), - // }) - - // Inbound HRMP / XCMP transport - this.#log.info('[%s] %s subscribe inbound HRMP', this.#id, chainId) - + // Extract both DMP and HRMP receive subs.push({ chainId, sub: messageHashBlocks$ - .pipe(extractEvents(), extractXcmpReceive(), mapXcmInbound(chainId)) + .pipe(extractEvents(), extractParachainReceive(), mapXcmInbound(chainId)) .subscribe(inboundObserver), }) } diff --git a/packages/server/src/testing/__data__/blocks/interlay/7025155.cbor b/packages/server/src/testing/__data__/blocks/interlay/7025155.cbor new file mode 100644 index 00000000..4aef7a7f Binary files /dev/null and b/packages/server/src/testing/__data__/blocks/interlay/7025155.cbor differ diff --git a/packages/server/src/testing/tools/download.ts b/packages/server/src/testing/tools/download.ts index 4896c78e..35744df5 100644 --- a/packages/server/src/testing/tools/download.ts +++ b/packages/server/src/testing/tools/download.ts @@ -17,6 +17,7 @@ export const networks = { moonbeam: 'wss://moonbeam.ibp.network', astar: 'wss://rpc.astar.network', bifrost: 'wss://bifrost-polkadot.ibp.network', + interlay: 'wss://interlay-rpc.dwellir.com', } as Record async function download([name, ws, height]: [string, string, string]) {