queue.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package queue
  2. import (
  3. "sync"
  4. "github.com/samber/lo"
  5. )
  6. // Queue is a simple concurrent safe queue
  7. type Queue[T any] struct {
  8. items []T
  9. lock sync.RWMutex
  10. }
  11. // Put add the item to the queue.
  12. func (q *Queue[T]) Put(items ...T) {
  13. if len(items) == 0 {
  14. return
  15. }
  16. q.lock.Lock()
  17. q.items = append(q.items, items...)
  18. q.lock.Unlock()
  19. }
  20. // Pop returns the head of items.
  21. func (q *Queue[T]) Pop() T {
  22. if len(q.items) == 0 {
  23. return lo.Empty[T]()
  24. }
  25. q.lock.Lock()
  26. head := q.items[0]
  27. q.items = q.items[1:]
  28. q.lock.Unlock()
  29. return head
  30. }
  31. // Last returns the last of item.
  32. func (q *Queue[T]) Last() T {
  33. if len(q.items) == 0 {
  34. return lo.Empty[T]()
  35. }
  36. q.lock.RLock()
  37. last := q.items[len(q.items)-1]
  38. q.lock.RUnlock()
  39. return last
  40. }
  41. // Copy get the copy of queue.
  42. func (q *Queue[T]) Copy() []T {
  43. items := []T{}
  44. q.lock.RLock()
  45. items = append(items, q.items...)
  46. q.lock.RUnlock()
  47. return items
  48. }
  49. // Len returns the number of items in this queue.
  50. func (q *Queue[T]) Len() int64 {
  51. q.lock.RLock()
  52. defer q.lock.RUnlock()
  53. return int64(len(q.items))
  54. }
  55. // New is a constructor for a new concurrent safe queue.
  56. func New[T any](hint int64) *Queue[T] {
  57. return &Queue[T]{
  58. items: make([]T, 0, hint),
  59. }
  60. }