observable_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package observable
  2. import (
  3. "sync"
  4. "testing"
  5. "time"
  6. "github.com/metacubex/mihomo/common/atomic"
  7. "github.com/stretchr/testify/assert"
  8. )
  9. func iterator[T any](item []T) chan T {
  10. ch := make(chan T)
  11. go func() {
  12. time.Sleep(100 * time.Millisecond)
  13. for _, elm := range item {
  14. ch <- elm
  15. }
  16. close(ch)
  17. }()
  18. return ch
  19. }
  20. func TestObservable(t *testing.T) {
  21. iter := iterator[int]([]int{1, 2, 3, 4, 5})
  22. src := NewObservable[int](iter)
  23. data, err := src.Subscribe()
  24. assert.Nil(t, err)
  25. count := 0
  26. for range data {
  27. count++
  28. }
  29. assert.Equal(t, count, 5)
  30. }
  31. func TestObservable_MultiSubscribe(t *testing.T) {
  32. iter := iterator[int]([]int{1, 2, 3, 4, 5})
  33. src := NewObservable[int](iter)
  34. ch1, _ := src.Subscribe()
  35. ch2, _ := src.Subscribe()
  36. count := atomic.NewInt32(0)
  37. var wg sync.WaitGroup
  38. wg.Add(2)
  39. waitCh := func(ch <-chan int) {
  40. for range ch {
  41. count.Add(1)
  42. }
  43. wg.Done()
  44. }
  45. go waitCh(ch1)
  46. go waitCh(ch2)
  47. wg.Wait()
  48. assert.Equal(t, int32(10), count.Load())
  49. }
  50. func TestObservable_UnSubscribe(t *testing.T) {
  51. iter := iterator[int]([]int{1, 2, 3, 4, 5})
  52. src := NewObservable[int](iter)
  53. data, err := src.Subscribe()
  54. assert.Nil(t, err)
  55. src.UnSubscribe(data)
  56. _, open := <-data
  57. assert.False(t, open)
  58. }
  59. func TestObservable_SubscribeClosedSource(t *testing.T) {
  60. iter := iterator[int]([]int{1})
  61. src := NewObservable[int](iter)
  62. data, _ := src.Subscribe()
  63. <-data
  64. _, closed := src.Subscribe()
  65. assert.NotNil(t, closed)
  66. }
  67. func TestObservable_UnSubscribeWithNotExistSubscription(t *testing.T) {
  68. sub := Subscription[int](make(chan int))
  69. iter := iterator[int]([]int{1})
  70. src := NewObservable[int](iter)
  71. src.UnSubscribe(sub)
  72. }
  73. func TestObservable_SubscribeGoroutineLeak(t *testing.T) {
  74. iter := iterator[int]([]int{1, 2, 3, 4, 5})
  75. src := NewObservable[int](iter)
  76. max := 100
  77. var list []Subscription[int]
  78. for i := 0; i < max; i++ {
  79. ch, _ := src.Subscribe()
  80. list = append(list, ch)
  81. }
  82. var wg sync.WaitGroup
  83. wg.Add(max)
  84. waitCh := func(ch <-chan int) {
  85. for range ch {
  86. }
  87. wg.Done()
  88. }
  89. for _, ch := range list {
  90. go waitCh(ch)
  91. }
  92. wg.Wait()
  93. for _, sub := range list {
  94. _, more := <-sub
  95. assert.False(t, more)
  96. }
  97. _, more := <-list[0]
  98. assert.False(t, more)
  99. }
  100. func Benchmark_Observable_1000(b *testing.B) {
  101. ch := make(chan int)
  102. o := NewObservable[int](ch)
  103. num := 1000
  104. subs := []Subscription[int]{}
  105. for i := 0; i < num; i++ {
  106. sub, _ := o.Subscribe()
  107. subs = append(subs, sub)
  108. }
  109. wg := sync.WaitGroup{}
  110. wg.Add(num)
  111. b.ResetTimer()
  112. for _, sub := range subs {
  113. go func(s Subscription[int]) {
  114. for range s {
  115. }
  116. wg.Done()
  117. }(sub)
  118. }
  119. for i := 0; i < b.N; i++ {
  120. ch <- i
  121. }
  122. close(ch)
  123. wg.Wait()
  124. }