Skip to content

Commit

Permalink
Merge pull request #20 from WorldMaker/artisinal-subscribers
Browse files Browse the repository at this point in the history
Ensure all of our subscribers are artisanal with proper closures
  • Loading branch information
WorldMaker authored Feb 17, 2022
2 parents 8b95eae + 672a27c commit 44e6974
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
2 changes: 1 addition & 1 deletion projects/angular-pharkas/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "angular-pharkas",
"version": "0.5.0",
"version": "0.5.1",
"peerDependencies": {
"@angular/common": "^11.1.1",
"@angular/core": "^11.1.1"
Expand Down
45 changes: 29 additions & 16 deletions projects/angular-pharkas/src/pharkas.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ interface PharkasComponentState<T> extends Observable<T> {
[pharkas]: boolean
}

function bindSubject<T>(observable: Observable<T>, subject: Subject<T>) {
// Zone's monkey patching of RxJS breaks the obvious binding (observable.subscribe(subject))
// as RxJS doesn't think it "safe" so we need to do this the hard way to support migrating
// out of Zone apps
return observable.subscribe({
next: (value: T) => subject.next(value),
error: (err: any) => subject.error(err),
complete: () => subject.complete(),
})
}

/**
* Pharkas Base Component
*
Expand Down Expand Up @@ -97,7 +108,7 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
}
bound = true
if (isObservable(value)) {
this[subscription].add(value.subscribe(subject))
this[subscription].add(bindSubject(value, subject))
} else {
subject.next(value)
}
Expand Down Expand Up @@ -195,7 +206,7 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
throw new Error(`${name} is already bound`)
}
const subject = new BehaviorSubject(defaultValue)
this[subscription].add(observable.subscribe(subject))
this[subscription].add(bindSubject(observable, subject))
this[props].set(name, {
type: 'display',
name,
Expand All @@ -222,7 +233,7 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
throw new Error(`${name} is already bound`)
}
const subject = new BehaviorSubject(defaultValue)
this[subscription].add(observable.subscribe(subject))
this[subscription].add(bindSubject(observable, subject))
this[props].set(name, {
type: 'display',
name,
Expand All @@ -243,7 +254,7 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
eventEmitter: EventEmitter<T>,
observable: Observable<T>
) {
this[subscription].add(observable.subscribe(eventEmitter))
this[subscription].add(bindSubject(observable, eventEmitter))
}

//#region *** Callbacks ***
Expand Down Expand Up @@ -382,7 +393,7 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
}
const subject = this[state].get(localState)
if (subject) {
this[subscription].add(observable.subscribe(subject))
this[subscription].add(bindSubject(observable, subject))
localState[pharkas] = true
} else {
throw new Error('Unknown local component state')
Expand Down Expand Up @@ -410,9 +421,10 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
const subject = this[state].get(localState) as BehaviorSubject<T>
if (subject) {
this[subscription].add(
observable
.pipe(map((value) => reducer(subject.value, value)))
.subscribe(subject)
bindSubject(
observable.pipe(map((value) => reducer(subject.value, value))),
subject
)
)
localState[pharkas] = true
} else {
Expand Down Expand Up @@ -442,12 +454,13 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
const subject = this[state].get(localState) as BehaviorSubject<T>
if (subject) {
this[subscription].add(
from(observables)
.pipe(
bindSubject(
from(observables).pipe(
mergeAll(),
map((value) => reducer(subject.value, value))
)
.subscribe(subject)
),
subject
)
)
localState[pharkas] = true
} else {
Expand Down Expand Up @@ -512,7 +525,7 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
//#endregion

ngOnInit(): void {
const subjects: Subject<unknown>[] = []
const observables: Observable<unknown>[] = []
for (const prop of this[props].values()) {
if (prop.type === 'display') {
if (prop.immediate) {
Expand All @@ -522,12 +535,12 @@ export class PharkasComponent<TViewModel> implements OnInit, OnDestroy {
})
)
} else {
subjects.push(prop.subject)
observables.push(prop.subject.asObservable())
}
}
}
if (subjects.length) {
const displayObservable = merge(...subjects).pipe(
if (observables.length) {
const displayObservable = merge(...observables).pipe(
throttleTime(0, animationFrameScheduler),
share()
)
Expand Down

0 comments on commit 44e6974

Please sign in to comment.