observable.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package observable
  2. import (
  3. "errors"
  4. "sync"
  5. )
  6. type Observable[T any] struct {
  7. iterable Iterable[T]
  8. listener map[Subscription[T]]*Subscriber[T]
  9. mux sync.Mutex
  10. done bool
  11. }
  12. func (o *Observable[T]) process() {
  13. for item := range o.iterable {
  14. o.mux.Lock()
  15. for _, sub := range o.listener {
  16. sub.Emit(item)
  17. }
  18. o.mux.Unlock()
  19. }
  20. o.close()
  21. }
  22. func (o *Observable[T]) close() {
  23. o.mux.Lock()
  24. defer o.mux.Unlock()
  25. o.done = true
  26. for _, sub := range o.listener {
  27. sub.Close()
  28. }
  29. }
  30. func (o *Observable[T]) Subscribe() (Subscription[T], error) {
  31. o.mux.Lock()
  32. defer o.mux.Unlock()
  33. if o.done {
  34. return nil, errors.New("observable is closed")
  35. }
  36. subscriber := newSubscriber[T]()
  37. o.listener[subscriber.Out()] = subscriber
  38. return subscriber.Out(), nil
  39. }
  40. func (o *Observable[T]) UnSubscribe(sub Subscription[T]) {
  41. o.mux.Lock()
  42. defer o.mux.Unlock()
  43. subscriber, exist := o.listener[sub]
  44. if !exist {
  45. return
  46. }
  47. delete(o.listener, sub)
  48. subscriber.Close()
  49. }
  50. func NewObservable[T any](iter Iterable[T]) *Observable[T] {
  51. observable := &Observable[T]{
  52. iterable: iter,
  53. listener: map[Subscription[T]]*Subscriber[T]{},
  54. }
  55. go observable.process()
  56. return observable
  57. }