operators/count.js

  1. import { Observable } from '../Observable';
  2. import { filter } from './filter';
  3. /**
  4. * Counts the number of values coming into the stream on complete
  5. *
  6. * @memberof operators
  7. *
  8. * @param {Observable} source$
  9. * @param {Function} [countCallback] a function to use to filter out values that you do not wish to count
  10. * @returns {Observable}
  11. */
  12. export const count = function (source$, countCallback = (value) => true) {
  13. return new Observable(function ({next, error, complete }) {
  14. let count = 0;
  15. const subscription = filter(source$, countCallback).subscribe({
  16. next () {
  17. count++;
  18. },
  19. error,
  20. complete () {
  21. next(count);
  22. complete();
  23. }
  24. });
  25. return () => subscription.unsubscribe();
  26. });
  27. };
  28. Observable.count = count;
  29. Observable.prototype.count = function (countCallback) {
  30. return count(this, countCallback);
  31. };