subscriber.go 465 B

123456789101112131415161718192021222324252627282930313233
  1. package observable
  2. import (
  3. "sync"
  4. )
  5. type Subscription[T any] <-chan T
  6. type Subscriber[T any] struct {
  7. buffer chan T
  8. once sync.Once
  9. }
  10. func (s *Subscriber[T]) Emit(item T) {
  11. s.buffer <- item
  12. }
  13. func (s *Subscriber[T]) Out() Subscription[T] {
  14. return s.buffer
  15. }
  16. func (s *Subscriber[T]) Close() {
  17. s.once.Do(func() {
  18. close(s.buffer)
  19. })
  20. }
  21. func newSubscriber[T any]() *Subscriber[T] {
  22. sub := &Subscriber[T]{
  23. buffer: make(chan T, 200),
  24. }
  25. return sub
  26. }