fetcher.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. package resource
  2. import (
  3. "bytes"
  4. "crypto/md5"
  5. "os"
  6. "path/filepath"
  7. "time"
  8. types "github.com/metacubex/mihomo/constant/provider"
  9. "github.com/metacubex/mihomo/log"
  10. "github.com/sagernet/fswatch"
  11. "github.com/samber/lo"
  12. )
  13. var (
  14. fileMode os.FileMode = 0o666
  15. dirMode os.FileMode = 0o755
  16. )
  17. type Parser[V any] func([]byte) (V, error)
  18. type Fetcher[V any] struct {
  19. resourceType string
  20. name string
  21. vehicle types.Vehicle
  22. UpdatedAt time.Time
  23. done chan struct{}
  24. hash [16]byte
  25. parser Parser[V]
  26. interval time.Duration
  27. OnUpdate func(V)
  28. watcher *fswatch.Watcher
  29. }
  30. func (f *Fetcher[V]) Name() string {
  31. return f.name
  32. }
  33. func (f *Fetcher[V]) Vehicle() types.Vehicle {
  34. return f.vehicle
  35. }
  36. func (f *Fetcher[V]) VehicleType() types.VehicleType {
  37. return f.vehicle.Type()
  38. }
  39. func (f *Fetcher[V]) Initial() (V, error) {
  40. var (
  41. buf []byte
  42. err error
  43. isLocal bool
  44. forceUpdate bool
  45. )
  46. if stat, fErr := os.Stat(f.vehicle.Path()); fErr == nil {
  47. buf, err = os.ReadFile(f.vehicle.Path())
  48. modTime := stat.ModTime()
  49. f.UpdatedAt = modTime
  50. isLocal = true
  51. if f.interval != 0 && modTime.Add(f.interval).Before(time.Now()) {
  52. log.Warnln("[Provider] %s not updated for a long time, force refresh", f.Name())
  53. forceUpdate = true
  54. }
  55. } else {
  56. buf, err = f.vehicle.Read()
  57. f.UpdatedAt = time.Now()
  58. }
  59. if err != nil {
  60. return lo.Empty[V](), err
  61. }
  62. var contents V
  63. if forceUpdate {
  64. var forceBuf []byte
  65. if forceBuf, err = f.vehicle.Read(); err == nil {
  66. if contents, err = f.parser(forceBuf); err == nil {
  67. isLocal = false
  68. buf = forceBuf
  69. }
  70. }
  71. }
  72. if err != nil || !forceUpdate {
  73. contents, err = f.parser(buf)
  74. }
  75. if err != nil {
  76. if !isLocal {
  77. return lo.Empty[V](), err
  78. }
  79. // parse local file error, fallback to remote
  80. buf, err = f.vehicle.Read()
  81. if err != nil {
  82. return lo.Empty[V](), err
  83. }
  84. contents, err = f.parser(buf)
  85. if err != nil {
  86. return lo.Empty[V](), err
  87. }
  88. isLocal = false
  89. }
  90. if f.vehicle.Type() != types.File && !isLocal {
  91. if err := safeWrite(f.vehicle.Path(), buf); err != nil {
  92. return lo.Empty[V](), err
  93. }
  94. }
  95. f.hash = md5.Sum(buf)
  96. // pull contents automatically
  97. if f.vehicle.Type() == types.File {
  98. f.watcher, err = fswatch.NewWatcher(fswatch.Options{
  99. Path: []string{f.vehicle.Path()},
  100. Direct: true,
  101. Callback: f.update,
  102. })
  103. if err != nil {
  104. return lo.Empty[V](), err
  105. }
  106. err = f.watcher.Start()
  107. if err != nil {
  108. return lo.Empty[V](), err
  109. }
  110. } else if f.interval > 0 {
  111. go f.pullLoop()
  112. }
  113. return contents, nil
  114. }
  115. func (f *Fetcher[V]) Update() (V, bool, error) {
  116. buf, err := f.vehicle.Read()
  117. if err != nil {
  118. return lo.Empty[V](), false, err
  119. }
  120. now := time.Now()
  121. hash := md5.Sum(buf)
  122. if bytes.Equal(f.hash[:], hash[:]) {
  123. f.UpdatedAt = now
  124. _ = os.Chtimes(f.vehicle.Path(), now, now)
  125. return lo.Empty[V](), true, nil
  126. }
  127. contents, err := f.parser(buf)
  128. if err != nil {
  129. return lo.Empty[V](), false, err
  130. }
  131. if f.vehicle.Type() != types.File {
  132. if err := safeWrite(f.vehicle.Path(), buf); err != nil {
  133. return lo.Empty[V](), false, err
  134. }
  135. }
  136. f.UpdatedAt = now
  137. f.hash = hash
  138. return contents, false, nil
  139. }
  140. func (f *Fetcher[V]) Destroy() error {
  141. if f.interval > 0 {
  142. f.done <- struct{}{}
  143. }
  144. if f.watcher != nil {
  145. _ = f.watcher.Close()
  146. }
  147. return nil
  148. }
  149. func (f *Fetcher[V]) pullLoop() {
  150. initialInterval := f.interval - time.Since(f.UpdatedAt)
  151. if initialInterval > f.interval {
  152. initialInterval = f.interval
  153. }
  154. timer := time.NewTimer(initialInterval)
  155. defer timer.Stop()
  156. for {
  157. select {
  158. case <-timer.C:
  159. timer.Reset(f.interval)
  160. f.update(f.vehicle.Path())
  161. case <-f.done:
  162. return
  163. }
  164. }
  165. }
  166. func (f *Fetcher[V]) update(path string) {
  167. elm, same, err := f.Update()
  168. if err != nil {
  169. log.Errorln("[Provider] %s pull error: %s", f.Name(), err.Error())
  170. return
  171. }
  172. if same {
  173. log.Debugln("[Provider] %s's content doesn't change", f.Name())
  174. return
  175. }
  176. log.Infoln("[Provider] %s's content update", f.Name())
  177. if f.OnUpdate != nil {
  178. f.OnUpdate(elm)
  179. }
  180. }
  181. func safeWrite(path string, buf []byte) error {
  182. dir := filepath.Dir(path)
  183. if _, err := os.Stat(dir); os.IsNotExist(err) {
  184. if err := os.MkdirAll(dir, dirMode); err != nil {
  185. return err
  186. }
  187. }
  188. return os.WriteFile(path, buf, fileMode)
  189. }
  190. func NewFetcher[V any](name string, interval time.Duration, vehicle types.Vehicle, parser Parser[V], onUpdate func(V)) *Fetcher[V] {
  191. return &Fetcher[V]{
  192. name: name,
  193. vehicle: vehicle,
  194. parser: parser,
  195. done: make(chan struct{}, 8),
  196. OnUpdate: onUpdate,
  197. interval: interval,
  198. }
  199. }