-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathoverlay.ts
405 lines (359 loc) · 14.3 KB
/
overlay.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
/**
* An overlay layer which allows persisting entities in a memory cache,
* retrieving them (based on id or relations) and performing updates
* on them, before "flushing" the final state into the database.
*/
import { Store } from '@subsquid/typeorm-store'
import AsyncLock from 'async-lock'
import _, { isObject } from 'lodash'
import { EntityManager, FindOptionsWhere, In, Not, Repository } from 'typeorm'
import { Logger } from '../logger'
import { NextEntityId } from '../model/NextEntityId'
import { criticalError, idStringFromNumber } from './misc'
// A stub which can represent any entity type
export type AnyEntity = { id: string }
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type Constructor<E> = { new (...args: any[]): E; name: string }
// State of an entity which is persisted in the cache (memory) layer
enum CachedEntityState {
UpToDate, // the entity is up to date with the stored (database) state
ToBeSaved, // the entity has been updated in the cache and is scheduled for saving
ToBeRemoved, // the entity is scheduled for removal
}
// Represents a single cached entity
type Cached<E> = {
entity?: E
state: CachedEntityState
}
// Represents a "flat" entity of given type.
// "Flat" means the entity is stripped of any references to other entities.
// Only the ids of releated entities are preserved,
// provided that the entity is the owning-side of given relation.
export type Flat<E> = Omit<
E,
{
[K in keyof E]: E[K] extends AnyEntity | null | undefined
? K
: E[K] extends AnyEntity[]
? K
: never
}[keyof E]
>
// Union of entity keys that store the ids of related entities
// (provided the entity is the owning-side of given relation)
type OwnedRelations<E> = {
[K in keyof E & string]: `${K}Id` extends keyof E ? `${K}Id` : never
}[keyof E & string]
export type FlatRelationless<E> = Omit<Flat<E>, OwnedRelations<E>>
// The entity repository overlay
export class RepositoryOverlay<E extends AnyEntity = AnyEntity> {
// All currently cached entities of given type
private cached: Map<string, Cached<E>> = new Map()
// Name of the entity class
public readonly entityName: string
// Locking construct to prevent multiple asynchronous reads of same
// "uncached" entity from overlay, which could lead to one operation
// rewriting the cached state of other operation (i.e. lost updates)
private asyncLock: AsyncLock = new AsyncLock({ maxPending: Number.MAX_SAFE_INTEGER })
public constructor(
public readonly EntityClass: Constructor<E>,
private repository: Repository<E>,
private nextId: number
) {
this.entityName = this.repository.metadata.name
}
public cacheSize() {
return this.cached.size
}
// Get id of the new entity of given type and increment `this.nextId`
getNewEntityId(): string {
return idStringFromNumber(this.nextId++)
}
// Get current value of `this.nextId` number
getNextIdNumber(): number {
return this.nextId
}
setNextEntityId(nextId: number) {
this.nextId = nextId
}
// Prevents inserting strings that contain null character into the postgresql table
// (as this would cause an error)
private normalizeString(s: string) {
// eslint-disable-next-line no-control-regex
return s.replace(/\u0000/g, '')
}
// Makes use of `normalizeString` to prevent inserting null character into the postgresql table
private normalizeInput(e: Record<string, unknown>) {
for (const [k, v] of Object.entries(e)) {
if (isObject(v) && `isTypeOf` in v) {
this.normalizeInput(v as Record<string, unknown>)
}
if (typeof v === 'string') {
e[k] = this.normalizeString(v)
}
}
}
// Returns a proxy to given cached entity.
// The proxy allows updating the entity state from `CachedEntityState.UpToDate`
// to `CachedEntityState.ToBeSaved` in case any of the entity properties have
// been updated (`set`)
private asCachedProxy(e: E): E {
return new Proxy(e, {
set: (obj, prop, value, receiver) => {
const cached = this.cached.get(e.id)
if (cached?.state === CachedEntityState.UpToDate) {
cached.state = CachedEntityState.ToBeSaved
}
return Reflect.set(obj, prop, value, receiver)
},
})
}
// Schedules provided entities for later removal.
remove(...items: (E | Flat<E> | string)[]): this {
items.forEach((entityOrId) => {
const entityId = typeof entityOrId === 'string' ? entityOrId : (entityOrId as E).id
Logger.get().debug(`Scheduling ${this.entityName}:${entityId} for removal`)
this.cached.set(entityId, { state: CachedEntityState.ToBeRemoved })
})
return this
}
// Checks if an entity matches a simple where condition
// (ie. the speficied non-relational fields have exactly matching values)
matches(
entity: FlatRelationless<E>,
where: { [K in keyof FlatRelationless<E>]?: E[K] }
): boolean {
return Object.entries(where).every(
([key, value]) => entity[key as keyof FlatRelationless<E>] === value
)
}
// Retrieves a (flat) entity by any non-relational field(s)
async getOneBy(where: { [K in keyof FlatRelationless<E>]?: E[K] }): Promise<Flat<E> | undefined> {
// Lock complete table
return this.asyncLock.acquire(`${this.entityName}`, async () => {
const allCached = Array.from(this.cached.values())
const cachedFound = allCached.find(
(e) =>
e.state !== CachedEntityState.ToBeRemoved && e.entity && this.matches(e.entity, where)
)
if (cachedFound) {
return cachedFound.entity
}
const stored = await this.repository.findBy(where as FindOptionsWhere<E>)
for (const storedEntity of stored) {
// See if we have a cached version of this entity. If yes - prioritize the cached one!
const cached = this.cached.get(storedEntity.id)
if (cached?.state === CachedEntityState.ToBeRemoved) {
continue
}
if (cached?.entity && this.matches(cached.entity, where)) {
return cached.entity
}
if (!cached && this.matches(storedEntity, where)) {
return this.cache(storedEntity)
}
}
return undefined
})
}
// Retrieves a (flat) entity by id.
// Cached version of the entity has a priority.
// If not found in cache - the entity is retrieved from the database and then cached (if existing).
async getById(id: string): Promise<Flat<E> | undefined> {
// Lock single entity by given ID
return this.asyncLock.acquire(`${this.entityName}-${id}`, async () => {
const cached = this.cached.get(id)
if (cached?.state === CachedEntityState.ToBeRemoved) {
return undefined
}
if (cached?.entity) {
return cached.entity
}
const stored = await this.repository.findOneBy({ id } as FindOptionsWhere<E>)
if (stored) {
// Update cache if entity found
return this.cache(stored)
}
return undefined
})
}
// Get all entities of given type that satisfy given relation condition (e[relation] = id).
// This is achieved by inspecting both cached and stored state.
// The entity must be the owning-side of the relation.
async getManyByRelation(relation: OwnedRelations<E>, id: string): Promise<Flat<E>[]> {
// Lock complete table
return this.asyncLock.acquire(`${this.entityName}`, async () => {
const cachedIds = Array.from(this.cached.keys())
// Get all managed child entities (excluding those scheduled for removal) where child.parent_id = parent.id
const cachedChildren = Array.from(this.cached.values())
.filter(
(e) => e.state !== CachedEntityState.ToBeRemoved && e.entity && e.entity[relation] === id
)
.flatMap((e) => (e.entity ? [e.entity] : []))
// Get all stored child entities where child.parent_id = parent.id and child.id is NOT IN(cachedIds)
const storedChildren = await this.repository.findBy({
id: Not(In(cachedIds)),
[relation]: id,
} as FindOptionsWhere<E>)
// Cache loaded entities
const storedChildrenCached = storedChildren.map((c) => this.cache(c))
// Return concatinated result
return cachedChildren.concat(storedChildrenCached)
})
}
// Get a single entity of given type that satisfied given relation condition.
// Throws in case there is more than 1 entity that satisfies the condition.
async getOneByRelation(relation: OwnedRelations<E>, id: string): Promise<Flat<E> | undefined> {
const related = await this.getManyByRelation(relation, id)
if (related.length > 1) {
criticalError(
`Expected one entity related through ${this.entityName}.${relation}=${id}. ` +
`Got ${related.length}.`
)
}
return related[0]
}
// Same as getById, but fails if no entity found.
async getByIdOrFail(id: string): Promise<Flat<E>> {
const result = await this.getById(id)
if (!result) {
criticalError(`${this.entityName} entity not found by id ${id}`)
}
return result
}
// Same as getOneByRelation, but fails if no entity found.
async getOneByRelationOrFail(relation: OwnedRelations<E>, id: string): Promise<Flat<E>> {
const result = await this.getOneByRelation(relation, id)
if (!result) {
criticalError(
`Expected to find entity by ${this.entityName}.${relation}=${id}. ` + `None found.`
)
}
return result
}
// invalidate cache line
// Creates a new entity of given type and schedules it for insertion.
new(entityFields: Partial<Flat<E>>): Flat<E> {
const entity = new this.EntityClass(entityFields)
// normalize the input (remove UTF-8 null characters)
this.normalizeInput(entity)
if (process.env.TESTING !== 'true' && process.env.TESTING !== '1') {
Logger.get().debug(`Creating new ${this.entityName}: ${entity.id}`)
}
// Entities with the same id will override existing ones (!)
this.cached.set(entity.id, { entity, state: CachedEntityState.ToBeSaved })
return entity
}
// Caches given entity without scheduling it for update.
// Returns a proxy which updated entity state to "CachedEntityState.ToBeSaved"
// in case any of the entity properties are modified (`set`)
private cache(entity: E): E {
const proxy = this.asCachedProxy(entity)
this.cached.set(entity.id, {
entity: proxy,
state: CachedEntityState.UpToDate,
})
return proxy
}
// Returns all entities scheduled for saving
private getAllToBeSaved(): E[] {
return [...this.cached.values()]
.filter(({ state }) => state === CachedEntityState.ToBeSaved)
.flatMap(({ entity }) => (entity ? [entity] : []))
}
// Returns all entities scheduled for removal
private getAllIdsToBeRemoved(): string[] {
return [...this.cached.entries()]
.filter(([, { state }]) => state === CachedEntityState.ToBeRemoved)
.map(([id]) => id)
}
// Execute all scheduled entity inserts/updates
async executeScheduledUpdates(): Promise<void> {
const logger = Logger.get()
const toBeSaved = this.getAllToBeSaved()
if (toBeSaved.length) {
if (process.env.TESTING !== 'true' && process.env.TESTING !== '1') {
logger.info(`Saving ${toBeSaved.length} ${this.entityName} entities...`)
logger.debug(
`Ids of ${this.entityName} entities to save: ${toBeSaved.map((e) => e.id).join(', ')}`
)
}
await this.repository.save(toBeSaved)
}
}
// Execute all scheduled entity removals
async executeScheduledRemovals(): Promise<void> {
const logger = Logger.get()
const toBeRemoved = this.getAllIdsToBeRemoved().map((id) => new this.EntityClass({ id }))
if (toBeRemoved.length) {
logger.info(`Removing ${toBeRemoved.length} ${this.entityName} entities...`)
logger.debug(
`Ids of ${this.entityName} entities to remove: ${toBeRemoved.map((e) => e.id).join(', ')}`
)
await this.repository.remove(_.cloneDeep(toBeRemoved))
}
}
cleanCache(): void {
this.cached.clear()
}
}
// The entity manager overlay
export class EntityManagerOverlay {
// Map of already created entity repositories
private repositories: Map<string, RepositoryOverlay> = new Map()
constructor(
private em: EntityManager,
private nextEntityIds: NextEntityId[],
private afterDbUpdate: (em: EntityManager) => Promise<void>
) {}
public static async create(store: Store, afterDbUpdate: (em: EntityManager) => Promise<void>) {
// FIXME: This is a little hacky, but we really need to access the underlying EntityManager
const em = await (store as unknown as { em: () => Promise<EntityManager> }).em()
// Add "admin" schema to search path in order to be able to access "hidden" entities
await em.query('SET search_path TO admin,public')
const nextEntityIds = await em.find(NextEntityId, {})
return new EntityManagerOverlay(em, nextEntityIds, afterDbUpdate)
}
public totalCacheSize() {
return Array.from(this.repositories.values()).reduce((a, b) => a + b.cacheSize(), 0)
}
public getEm() {
return this.em
}
// Create an entity repository overlay or load already cached one
public getRepository<E extends AnyEntity>(entityClass: Constructor<E>): RepositoryOverlay<E> {
const loadedRepository = this.repositories.get(entityClass.name)
if (loadedRepository) {
// FIXME: Ideally `any` should be avoided, but it's tricky to achieve that here
return loadedRepository as RepositoryOverlay<any>
}
const originalRepository = this.em.getRepository(entityClass)
const nextEntityId = this.nextEntityIds.find((v) => v.entityName === entityClass.name)
const repositoryOverlay = new RepositoryOverlay(
entityClass,
originalRepository,
nextEntityId?.nextId || 1
)
this.repositories.set(entityClass.name, repositoryOverlay as RepositoryOverlay<any>)
return repositoryOverlay
}
// Update database - "flush" the cached state
async updateDatabase() {
await Promise.all(
Array.from(this.repositories.values()).map(async (r) => {
await r.executeScheduledUpdates()
await r.executeScheduledRemovals()
r.cleanCache()
})
)
const nextIds = Array.from(this.repositories.values()).map(
(r) =>
new NextEntityId({
entityName: r.entityName,
nextId: r.getNextIdNumber(),
})
)
await this.em.save(nextIds)
await this.afterDbUpdate(this.em)
}
}