Implementing async.queue using rxjs
There is an example of queue using rxjs:
Rx.Observable.from(['foo', 'bar', 'baz', 'bay', 'bax', 'bar', 'cat'])
.do(x => console.log((new Date).toLocaleTimeString() + " " + x))
// grouping by 2
.bufferCount(2)
// concat received results
.concatMap((data) => {
return Rx.Observable.defer(() => {
// assuming long operation here, e.g. downloading,
// we can use merge here if we want to do operation per item
return Rx.Observable.create((observer)=>{
setTimeout(function () {
observer.next(data);
observer.complete();
}, 2000);
})
});
})
.subscribe(
result => console.log((new Date).toLocaleTimeString() + " finished " + result),
error => console.error(error),
() => console.log('done')
);
// Console ouput
// rxjs.html:9 Console was cleared
// rxjs.html:11 9:56:36 PM foo
// rxjs.html:11 9:56:36 PM bar
// rxjs.html:11 9:56:36 PM baz
// rxjs.html:11 9:56:36 PM bay
// rxjs.html:11 9:56:36 PM bax
// rxjs.html:11 9:56:36 PM bar
// rxjs.html:11 9:56:36 PM cat
// rxjs.html:28 9:56:38 PM finished foo,bar
// rxjs.html:28 9:56:40 PM finished baz,bay
// rxjs.html:28 9:56:42 PM finished bax,bar
// rxjs.html:28 9:56:44 PM finished cat
// rxjs.html:30 done
Used links: