picker.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package picker
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. )
  7. // Picker provides synchronization, and Context cancelation
  8. // for groups of goroutines working on subtasks of a common task.
  9. // Inspired by errGroup
  10. type Picker[T any] struct {
  11. ctx context.Context
  12. cancel func()
  13. wg sync.WaitGroup
  14. once sync.Once
  15. errOnce sync.Once
  16. result T
  17. err error
  18. }
  19. func newPicker[T any](ctx context.Context, cancel func()) *Picker[T] {
  20. return &Picker[T]{
  21. ctx: ctx,
  22. cancel: cancel,
  23. }
  24. }
  25. // WithContext returns a new Picker and an associated Context derived from ctx.
  26. // and cancel when first element return.
  27. func WithContext[T any](ctx context.Context) (*Picker[T], context.Context) {
  28. ctx, cancel := context.WithCancel(ctx)
  29. return newPicker[T](ctx, cancel), ctx
  30. }
  31. // WithTimeout returns a new Picker and an associated Context derived from ctx with timeout.
  32. func WithTimeout[T any](ctx context.Context, timeout time.Duration) (*Picker[T], context.Context) {
  33. ctx, cancel := context.WithTimeout(ctx, timeout)
  34. return newPicker[T](ctx, cancel), ctx
  35. }
  36. // Wait blocks until all function calls from the Go method have returned,
  37. // then returns the first nil error result (if any) from them.
  38. func (p *Picker[T]) Wait() T {
  39. p.wg.Wait()
  40. if p.cancel != nil {
  41. p.cancel()
  42. p.cancel = nil
  43. }
  44. return p.result
  45. }
  46. // Error return the first error (if all success return nil)
  47. func (p *Picker[T]) Error() error {
  48. return p.err
  49. }
  50. // Go calls the given function in a new goroutine.
  51. // The first call to return a nil error cancels the group; its result will be returned by Wait.
  52. func (p *Picker[T]) Go(f func() (T, error)) {
  53. p.wg.Add(1)
  54. go func() {
  55. defer p.wg.Done()
  56. if ret, err := f(); err == nil {
  57. p.once.Do(func() {
  58. p.result = ret
  59. if p.cancel != nil {
  60. p.cancel()
  61. p.cancel = nil
  62. }
  63. })
  64. } else {
  65. p.errOnce.Do(func() {
  66. p.err = err
  67. })
  68. }
  69. }()
  70. }
  71. // Close cancels the picker context and releases resources associated with it.
  72. // If Wait has been called, then there is no need to call Close.
  73. func (p *Picker[T]) Close() error {
  74. if p.cancel != nil {
  75. p.cancel()
  76. p.cancel = nil
  77. }
  78. return nil
  79. }