In enterprise web applications, usually, there are data-grids with filters. In certain situations, you also need to enable polling in a specified interval. Recently, I have been working on a project that has such a requirement, however, the first implementation is an imperative implementation with manually unsubscribing from observables and using setInterval
. Almost every week there was a bug that originated from this polling logic, and I've decided to fully refactor the code into a declarative, RxJS based solution. It was a good opportunity to write marble tests as well.
I decided to create a blog post series about certain scenarios that can be a challenge to implement. This is the first part of the series.
If you would like to jump straight into the implementation, you can check out my ng-reusables git repository.
The Requirements
The user stories look something like the following:
As a user, I'd like to be able to use the filter inputs to provide filter parameters to the search action.
When I click on the search button, and the request takes too long, I'd like to be able to stop the request. In this case, keep the already fetched data in the data-grid.
I would also like to have a select, in which I can choose an interval value. When I trigger a search, it searches by polling at the provided interval. During polling, if I change the filters the next poll will use those filters.
As a user, I'd like the search button to display 'Refresh' when there are no changes to the form.
So let's think it through and gather the problems that we need to solve.
- We need a
Search
button which changes its text and behaviour toStop
andRefresh
based on the current state. - We need a polling interval select near the search button. When we change it during polling, the interval should change accordingly.
- Searching and Polling should only start when the search button gets interacted with. (clicking or pressing enter)
- Whenever we search, and the response takes too long to arrive, we need the ability to stop the request.
- We need to be able to cache the previous value in case an error occurs.
- After a successful search, the search button should display
Refresh
. When the filter form changes in any way, displaySearch
.
The action streams and their triggers
First, let's set up our template, so we can visualise it. We have two inputs in this example, a username
and an email
input. Then we add a select
element and bind it to the pollInterval
form control. All the inputs are inside a form
element, which is bound to the searchForm
FormGroup
.
We also have a button, which gets displayed when there is a value in the searchState$
observable. This observable is going to have a default value, so the button always gets displayed. We also bind a click handler to the button. It calls triggerSearchAction()
with the actual state every time it gets clicked.
<form [formGroup]="searchForm">
<label for="username">Username</label>
<input id="username" type="text" name="username" formControlName="username" />
<br>
<label for="email">email</label>
<input id="email" type="text" name="email" formControlName="email" />
<br>
<div *ngIf="searchForm.get('email').hasError('email')"> Invalid e-mail address.</div>
<br>
<select name="pollInterval" formControlName="pollInterval">
<option value="0">No polling</option>
<option value="5000">5 sec</option>
<option value="10000">10 sec</option>
</select>
<button *ngIf="searchState$ | async as state" (click)="triggerSearchAction(state)">
<ng-container *ngIf="state === 'IDLE' && searchForm.dirty">Search</ng-container>
<ng-container *ngIf="state === 'IDLE' && !searchForm.dirty">Refresh</ng-container>
<ng-container *ngIf="state === 'SEARCH'">Stop</ng-container>
</button>
</form>
<your-data-grid [data]="searchResults$ | async"> </your-data-grid>
We also display the different button texts in separate ng-containers
. This way our button stays accessible. Whenever the state changes, only the button's text changes. The button does not get removed from the DOM, therefore, it does not lose the focus. It is rather important for keyboard-only users. Finally, we have a data-grid component, and we bind the searchResults$
observable to it as a data source.
In our component we set up the searchForm
with validators to the provided controls. In the class constructor, we also call markAsdirty()
on it. This way the searchState$
observable maps to IDLE
and our button displays Search
. Our searchStateSub
is a BehaviorSubject
with an initial value of IDLE
. When the searchForm
is not dirty, it changes the button to display Refresh
. The searchAction$
observable sets the search state based on the provided action. The triggerSearchAction()
method handles this logic.
type SearchTriggers = 'START' | 'STOP'
type SearchToggleStates = 'IDLE' | 'SEARCH'
interface SearchFormValue {
username: string
email: string
pollInterval: number
}
@Component({
selector: 'search-polling-example',
template: 'search-polling-example.component.html',
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class SearchPollingExampleComponent {
readonly searchForm = this.formBuilder.group({
username: ['', []],
email: ['', [Validators.email]],
pollInterval: [0, []],
})
private searchActionSub = new Subject<SearchTriggers>();
private searchStateSub = new BehaviorSubject<SearchToggleStates>('IDLE');
readonly searchState$: Observable<SearchToggleStates> = this.searchStateSub.asObservable();
private readonly searchAction$: Observable<SearchTriggers> = this.searchActionSub.asObservable().pipe(
tap((action: SearchTriggers) => {
this.searchStateSub.next(action === 'START' ? 'SEARCH' : 'IDLE');
})
);
readonly searchResults$: Observable<User[]> = this.searchAction$
// We need to implement the search and polling mechanism here using the 'fetchData' method.
.pipe()
constructor(private formBuilder: FormBuilder, private userService: UserService) {
this.searchForm.markAsDirty()
}
triggerSearchAction(state: SearchToggleStates): void {
this.searchActionSub.next(state === 'IDLE' ? 'START' : 'STOP')
}
fetchData(formValue: SearchFormValue): Observable<User[] | null> {
return this.userService.fetchUsers(formValue)
.pipe(
catchError((error) => {
console.error(error)
return of(null)
})
)
}
}
We also implemented the fetchData
method. It accepts our searchForm
's values and fetches the data using the UserService
. If the service returns with an error, we catch it, and we return a null
value. This becomes important later when we need to fall back to a previously emitted value.
Planning the logic
With the initial logic set up, we need to implement the polling, and the circuit breaking logic. We do this in our searchResults$
observable pipe with some operator chaining. Since this logic is going to surface in other parts of the application as well, it is best to create a custom RxJS
operator for it. Before we jump right into coding, let's summarise our needs and think ahead.
This is going to be a higher-order operator, and it needs our searchForm
and the data-fetching method. It also needs to be able to set the search state. So our function needs at least 3 input parameters, and it needs to return a function.
type SearchTriggers = 'START' | 'STOP'
type SearchToggleStates = 'IDLE' | 'SEARCH'
export function searchPolling<T, F extends { pollInterval: number }>(
searchForm: FormGroup,
searchStateSub: Subject<SearchToggleStates> | ReplaySubject<SearchToggleStates> | BehaviorSubject<SearchToggleStates>,
fetchData: (formValue: F) => Observable<T[] | null>
): (source$: Observable<SearchTriggers>) => Observable<T[]> {
return (source$: Observable<SearchTriggers>) =>
source$
// Logic implementation comes here
.pipe()
}
Our custom observable operator method should:
- emit an empty array as a default value.
- trigger search when the event emitted by the source is
START
and set the search state. - trigger polling search when the provided
FromGroup
'spollInterval
property is not0
. - stop a running polling search when the
FromGroup
'spollInterval
property gets set to0
. - unsubscribe from fetching the data if it takes too long to arrive.
- emit the previously emitted value if a
null
value gets emitted. Either from the data retrieval function or when stopping a long request. - throw an error if it is used in a not intended way.
The implementation
Default value and handling misuse
These are solid test cases. Since we are creating an operator method, we can write our tests using rxjs-marbles. Let's start with writing the two easy tests. Namely, that our operator method does not get misused and that it emits an empty array as a default value. We use a hot
observable stream as our source because the source observable emits values outside from it. It is an action-stream which we trigger with passing START
and STOP
into it. The expectation is a cold observable because the value comes from our operator.
describe(`SearchPolling Operator`, () => {
let MOCK_FORM: FormGroup
let MOCK_SUBJECT: ReplaySubject<any>
beforeEach(() => {
MOCK_FORM = new FormGroup({
pollInterval: new FormControl(0),
})
MOCK_SUBJECT = new ReplaySubject(1)
})
it(`emits an empty array as a default value`, marbles((m) => {
const source$ = m.hot( '----')
const expected$ = m.cold('x---', { x: [] })
m.expect(
source$.pipe(searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])))
).toBeObservable(expected$)
}));
it(`should throw an error if it is misused`, marbles((m) => {
const expectedError = new Error(
`Search Polling can only accept 'START' and 'STOP' events, but you provided 'WHATEVER'`
)
const source$ = m.hot( '-a--', { a: 'WHATEVER' })
const expected$ = m.cold('x#--', { x: [] }, expectedError)
m.expect(
source$.pipe(searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])))
).toBeObservable(expected$)
}));
});
In our first test we can see, that if the action source never emits a value, the result still has an empty array as a value. In the second test, we start by emitting a WHATEVER
value and in our output, we expect an error. So let's update our operator to make these tests pass.
export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {
return (source$: Observable<SearchTriggers>) =>
source$.pipe(
switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
if (event === 'STOP') {
// stop logic comes here
} else if (event === 'START') {
// search and search polling logic comes here
} else {
return throwError(
new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
)
}
}),
startWith<T[], T[]>([])
)
}
So we use a switchMap
operator to convert our event stream into a data stream. This is important because the switchMap
operator unsubscribes from the previous stream. We set up the context for the STOP
and START
events. Inside the else
branch, we throw an error. This way we make sure that other developers who use this pipe won't have to deal with errors coming from SOTP
typos. Then we add the startWith
operator with an empty array. This does exactly what we need, it starts the data stream with an empty array.
Simple search
Let's implement the single search and the stop mechanism now. We need to display the previous values in our data-grid when we stop a search. When the service returns with an error, our data-fetch method will catch it and return null. We also need to set the search state back to IDLE
. Updating our button text requires us to call markAsPristine()
on our search form as well.
// in our test file
it(`should trigger search when the event emitted by the source is 'START' and set the search state to 'IDLE'.`, marbles(m => {
const source$ = m.hot( '--a-', { a: 'START' });
const searchState$ = m.hot( '--s-', { s: 'IDLE' });
const expected$ = m.cold('x-y-', { x: [], y: ['test'] });
m.expect(
source$.pipe(searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])))
).toBeObservable(expected$);
m.expect(
MOCK_SUBJECT.asObservable()
.pipe(tap(_ => expect(MOCK_FORM.dirty).toBe(false)))
).toBeObservable(searchState$);
}));
This tests our simple search logic, so let's implement the necessary code to make the test pass. In our else if
block, we call the fetchData
method with the search form's value. We also pipe through it, setting the search state to IDLE
. The tap
operator's callback called before our startWith
operator calls the markAsPristine()
method. It sets the search form to pristine
or 'not dirty'.
export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {
return (source$: Observable<SearchTriggers>) =>
source$.pipe(
switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
if (event === 'STOP') {
// stop logic comes here
} else if (event === 'START') {
const formData: F = searchForm.getRawValue();
if (formData.pollInterval === 0) {
return fetchData(formData).pipe(tap((_) => searchStateSub.next('IDLE'))
} else {
// polling logic implementation comes here.
}
} else {
return throwError(
new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
)
}
}),
tap(_ => searchForm.markAsPristine()),
startWith<T[], T[]>([])
)
}
Fetching the data and setting the state and search states were pretty straightforward. However, we still need to implement our circuit break and caching logic. Let's write our tests first.
Stop search and cache the previous value
In the first test, we simulate a slow internet connection. For that, we use the delay
operator on our mocked fetchData
method. In the second test, we need a more complex method for fetching the data. The mockFetchData
method uses the test's scope to increase a counter. When the counter reaches 2
, it returns a null
value. This simulates a network error on the third search request. The value that should get emitted must be the previous value.
// in our test file
it(`should unsubscribe from fetching the data if it takes too long to arrive and return the previously emitted value.`, marbles(m => {
const source$ = m.hot( '--ab', { a: 'START', b: 'STOP' });
const expected$ = m.cold('x--x', { x: [] });
m.expect(
source$.pipe(
searchPolling(
MOCK_FORM,
MOCK_SUBJECT,
(formValue: any) => of(['test']).pipe(delay(200)) // simulate slow response.
)
)
).toBeObservable(expected$);
}));
it(`should emit the previous successful value, when the data fetch encounters an error and a null value is emitted`, marbles(m => {
let counter = 0;
const mockData = ['firstEmmit', 'secondEmmit', 'thirdEmmit']
const mockFetchData = (formValue: any) => {
if (counter === 2) {
return of(null); // simulate caught error
} else {
const returnValue = mockData[counter]
counter++
return of([returnValue])
}
};
const source$ = m.hot( '--a-a-a', { a: 'START' });
const expected$ = m.cold('x-y-z-z', { x: [], y: ['firstEmmit'], z: ['secondEmmit'] });
m.expect(
source$.pipe(searchPolling((MOCK_FORM, MOCK_SUBJECT, mockFetchData))
).toBeObservable(expected$);
}));
We've set the search state to IDLE
in a tap
operator, however, we do need to set it whenever we stop a search. Also, it is going to be needed when we search with polling. First, we extract it into a function scoped inside our operator method. Then emit a simple null
value when a STOP
event triggers the function. Since we are inside a switchMap
operator, whenever a previous request takes too long, emitting a null
value will unsubscribe from the previous stream.
export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {
function setSearchStateToIdle<V>(): (s$: Observable<V>) => Observable<V> {
return (s$) => s$.pipe(tap((_) => searchStateSub.next('IDLE')))
}
return (source$: Observable<SearchTriggers>) =>
source$.pipe(
switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
if (event === 'STOP') {
return of<T[]>(null).pipe(setSearchStateToIdle())
} else if (event === 'START') {
const formData: F = searchForm.getRawValue()
if (formData.pollInterval === 0) {
return fetchData(formData).pipe(setSearchStateToIdle())
} else {
// polling logic implementation comes here.
}
} else {
return throwError(
new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
)
}
}),
tap((_) => searchForm.markAsPristine()),
startWith<T[], T[]>([]),
scan((previous, next) => next || previous)
)
}
The first emit always going to be an empty array. When we use the scan
operator we simply check the latest emitted value. The scan
operator caches previous values. If the latest value emitted is a null
value, we give back the previous value. In the worst-case scenario, the previous value will always be an empty array. It also keeps the previous search results in memory.
Polling
Now we only need to implement the polling mechanism. When polling, changes in the search form should be taken into account. Whenever the pollInterval
is set to zero, the polling should stop with the last request. When this occurs, the operator should unsubscribe from the search form's valueChanges
observable. After emitting the last search results, the search state is set to IDLE
upon the last emission.
// in our test file
it(`should trigger polling search when the provided FromGroups pollInterval property is not 0`, marbles((m) => {
MOCK_FORM.setValue({
pollInterval: 2,
});
const source$ = m.hot( '--a------b', { a: 'START', b: 'STOP' })
const expected$ = m.cold('x-y-y-y-yy', { x: [], y: ['test'] })
m.expect(
source$.pipe(
searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test']))
)
).toBeObservable(expected$)
}));
it(`should stop a running polling search when the FromGroup's pollInterval property gets set to 0`, marbles((m) => {
MOCK_FORM.setValue({
pollInterval: 2,
});
const source$ = m.hot( '--a-------', { a: 'START' })
const searchState$ = m.hot( '------s--', { s: 'IDLE' })
const expected$ = m.cold('x-y-y-y---', { x: [], y: ['test'] })
m.expect(
source$.pipe(
searchPolling(MOCK_FORM, MOCK_SUBJECT, (formValue: any) => of(['test'])),
// simulate setting the formControl on the third emmit.
scan((previous, current, index) => {
if (index === 2) {
MOCK_FORM.setValue({ pollInterval: 0 })
MOCK_FORM.updateValueAndValidity()
}
return current
})
)
).toBeObservable(expected$)
m.expect(MOCK_SUBJECT.asObservable()).toBeObservable(searchState$)
}));
We finally finish the implementation with the polling logic. To make the code readable, we implement it in a separate function. In this function we subscribe to searchForm.valueChanges
. We use the startWith
pipe again, to emit the initialFormValue
parameter. Then we use the switchMap
operator to implement the polling logic. If the pollInterval
is 0
we must switch to a simple observable which emits once. We can imagine how problematic it would be, if 0
was passed to a timer
as the time between intervals. This can only occur when a polling search is already in progress. We stop the polling here and we set the search state to IDLE
.
export function searchPolling<T, F extends { pollInterval: number }>(/* ... */) {
// ...
function usePolling(initialFormValue: F): Observable<T[]> {
return searchForm.valueChanges.pipe(
startWith<F, F>(initialFormValue),
switchMap((formValue: F) => {
const pollSource$: Observable<number> =
formValue.pollInterval === 0
? of(0).pipe(setSearchStateToIdle())
: timer(0, formValue.pollInterval)
return pollSource$.pipe(map<number, F>((_) => formValue))
}),
switchMap(fetchData),
takeWhile((_) => searchForm.getRawValue().pollInterval !== 0)
)
}
// ...
}
When the pollInterval is greater than 0
, we set a timer. Our pollSource$
observable would emit only numbers, so we map it back to the formValue
. We use this to fetch the data in the next switchMap
operator. Notice, that we don't set the search state to IDLE
here because we need to display the text Stop
on our button. And last, we unsubscribe from the valueChanges
observable using the takeWhile
operator. The takeWhile
operator completes a stream when the provided method returns a falsy value. This enables us to prevent triggering simple searches with changing form values. This would occur after stopping the polling by setting the pollInterval
to 0
.
type SearchTriggers = 'START' | 'STOP'
type SearchToggleStates = 'IDLE' | 'SEARCH'
export function searchPolling<T, F extends { pollInterval: number }>(
searchForm: FormGroup,
searchStateSub: Subject<SearchToggleStates> | ReplaySubject<SearchToggleStates> | BehaviorSubject<SearchToggleStates>,
fetchData: (formValue: F) => Observable<T[] | null>
): (source$: Observable<SearchTriggers>) => Observable<T[]> {
function setSearchStateToIdle<V>(): (s$: Observable<V>) => Observable<V> {
return (s$) => s$.pipe(tap((_) => searchStateSub.next('IDLE')))
}
function usePolling(initialFormValue: F): Observable<T[]> {
return searchForm.valueChanges.pipe(
startWith<F, F>(initialFormValue),
switchMap((formValue: F) => {
const pollSource$: Observable<number> =
formValue.pollInterval === 0
? of(0).pipe(setSearchStateToIdle())
: timer(0, formValue.pollInterval)
return pollSource$.pipe(map<number, F>((_) => formValue))
}),
switchMap(fetchData),
takeWhile((_) => searchForm.getRawValue().pollInterval !== 0)
)
}
return (source$: Observable<SearchTriggers>) =>
source$.pipe(
switchMap<SearchTriggers, Observable<T[]>>((event: SearchTriggers) => {
if (event === 'STOP') {
return of<T[]>(null).pipe(setSearchStateToIdle())
} else if (event === 'START') {
const formData: F = searchForm.getRawValue()
return formData.pollInterval === 0
? fetchData(formData).pipe(setSearchStateToIdle())
: usePolling(formData)
} else {
return throwError(
new Error(`Search Polling can only accept 'START' and 'STOP' events, but you provided: '${event}'`)
)
}
}),
tap((_) => searchForm.markAsPristine()),
startWith<T[], T[]>([]),
scan((previous, next) => next || previous)
)
}
And inside our component, we can update the searchResults$
observable.
export class SearchPollingExampleComponent {
//...
readonly searchResults$: Observable<User[]> = this.searchAction$.pipe(
searchPolling<User, SearchFormValue>(this.searchForm, this.searchStateSub, this.fetchData.bind(this))
)
//...
}
Conclusion
We have implemented a rather complex logic declaratively, using RxJS. I hope this coding walkthrough will help you understand how to use some RxJS operators. If this article helped you in any way, please shout out to me on Twitter or share it.
Top comments (0)