1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package observable
- import (
- "errors"
- "sync"
- )
- type Observable[T any] struct {
- iterable Iterable[T]
- listener map[Subscription[T]]*Subscriber[T]
- mux sync.Mutex
- done bool
- }
- func (o *Observable[T]) process() {
- for item := range o.iterable {
- o.mux.Lock()
- for _, sub := range o.listener {
- sub.Emit(item)
- }
- o.mux.Unlock()
- }
- o.close()
- }
- func (o *Observable[T]) close() {
- o.mux.Lock()
- defer o.mux.Unlock()
- o.done = true
- for _, sub := range o.listener {
- sub.Close()
- }
- }
- func (o *Observable[T]) Subscribe() (Subscription[T], error) {
- o.mux.Lock()
- defer o.mux.Unlock()
- if o.done {
- return nil, errors.New("observable is closed")
- }
- subscriber := newSubscriber[T]()
- o.listener[subscriber.Out()] = subscriber
- return subscriber.Out(), nil
- }
- func (o *Observable[T]) UnSubscribe(sub Subscription[T]) {
- o.mux.Lock()
- defer o.mux.Unlock()
- subscriber, exist := o.listener[sub]
- if !exist {
- return
- }
- delete(o.listener, sub)
- subscriber.Close()
- }
- func NewObservable[T any](iter Iterable[T]) *Observable[T] {
- observable := &Observable[T]{
- iterable: iter,
- listener: map[Subscription[T]]*Subscriber[T]{},
- }
- go observable.process()
- return observable
- }
|