From 80df572b182854df8181586af0003034b1160f78 Mon Sep 17 00:00:00 2001 From: Skyxim Date: Mon, 11 Jul 2022 21:30:34 +0800 Subject: [PATCH] refactor: Unified provider loading resources --- adapter/provider/parser.go | 5 +- adapter/provider/provider.go | 23 +- .../resource}/fetcher.go | 75 +++--- .../resource}/vehicle.go | 2 +- rules/provider/fetcher.go | 219 ------------------ rules/provider/parse.go | 6 +- rules/provider/provider.go | 26 ++- 7 files changed, 71 insertions(+), 285 deletions(-) rename {adapter/provider => component/resource}/fetcher.go (67%) rename {adapter/provider => component/resource}/vehicle.go (98%) delete mode 100644 rules/provider/fetcher.go diff --git a/adapter/provider/parser.go b/adapter/provider/parser.go index 887ac5cd..e3d91cfe 100644 --- a/adapter/provider/parser.go +++ b/adapter/provider/parser.go @@ -3,6 +3,7 @@ package provider import ( "errors" "fmt" + "github.com/Dreamacro/clash/component/resource" "time" "github.com/Dreamacro/clash/common/structure" @@ -51,9 +52,9 @@ func ParseProxyProvider(name string, mapping map[string]any) (types.ProxyProvide var vehicle types.Vehicle switch schema.Type { case "file": - vehicle = NewFileVehicle(path) + vehicle = resource.NewFileVehicle(path) case "http": - vehicle = NewHTTPVehicle(schema.URL, path) + vehicle = resource.NewHTTPVehicle(schema.URL, path) default: return nil, fmt.Errorf("%w: %s", errVehicleType, schema.Type) } diff --git a/adapter/provider/provider.go b/adapter/provider/provider.go index a38718a9..4cab2517 100644 --- a/adapter/provider/provider.go +++ b/adapter/provider/provider.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "github.com/Dreamacro/clash/common/convert" + "github.com/Dreamacro/clash/component/resource" "github.com/dlclark/regexp2" "math" "runtime" @@ -31,7 +32,7 @@ type ProxySetProvider struct { } type proxySetProvider struct { - *fetcher[[]C.Proxy] + *resource.Fetcher[[]C.Proxy] proxies []C.Proxy healthCheck *HealthCheck version uint @@ -43,7 +44,7 @@ func (pp *proxySetProvider) MarshalJSON() ([]byte, error) { "type": pp.Type().String(), "vehicleType": pp.VehicleType().String(), "proxies": pp.Proxies(), - "updatedAt": pp.updatedAt, + "updatedAt": pp.UpdatedAt, }) } @@ -52,7 +53,7 @@ func (pp *proxySetProvider) Version() uint { } func (pp *proxySetProvider) Name() string { - return pp.name + return pp.Fetcher.Name() } func (pp *proxySetProvider) HealthCheck() { @@ -60,19 +61,19 @@ func (pp *proxySetProvider) HealthCheck() { } func (pp *proxySetProvider) Update() error { - elm, same, err := pp.fetcher.Update() + elm, same, err := pp.Fetcher.Update() if err == nil && !same { - pp.onUpdate(elm) + pp.OnUpdate(elm) } return err } func (pp *proxySetProvider) Initial() error { - elm, err := pp.fetcher.Initial() + elm, err := pp.Fetcher.Initial() if err != nil { return err } - pp.onUpdate(elm) + pp.OnUpdate(elm) return nil } @@ -98,7 +99,7 @@ func (pp *proxySetProvider) setProxies(proxies []C.Proxy) { func stopProxyProvider(pd *ProxySetProvider) { pd.healthCheck.close() - _ = pd.fetcher.Destroy() + _ = pd.Fetcher.Destroy() } func NewProxySetProvider(name string, interval time.Duration, filter string, vehicle types.Vehicle, hc *HealthCheck) (*ProxySetProvider, error) { @@ -116,8 +117,8 @@ func NewProxySetProvider(name string, interval time.Duration, filter string, veh healthCheck: hc, } - fetcher := newFetcher[[]C.Proxy](name, interval, vehicle, proxiesParseAndFilter(filter, filterReg), proxiesOnUpdate(pd)) - pd.fetcher = fetcher + fetcher := resource.NewFetcher[[]C.Proxy](name, interval, vehicle, proxiesParseAndFilter(filter, filterReg), proxiesOnUpdate(pd)) + pd.Fetcher = fetcher wrapper := &ProxySetProvider{pd} runtime.SetFinalizer(wrapper, stopProxyProvider) @@ -216,7 +217,7 @@ func proxiesOnUpdate(pd *proxySetProvider) func([]C.Proxy) { } } -func proxiesParseAndFilter(filter string, filterReg *regexp2.Regexp) parser[[]C.Proxy] { +func proxiesParseAndFilter(filter string, filterReg *regexp2.Regexp) resource.Parser[[]C.Proxy] { return func(buf []byte) ([]C.Proxy, error) { schema := &ProxySchema{} diff --git a/adapter/provider/fetcher.go b/component/resource/fetcher.go similarity index 67% rename from adapter/provider/fetcher.go rename to component/resource/fetcher.go index 07fb1237..529e01b7 100644 --- a/adapter/provider/fetcher.go +++ b/component/resource/fetcher.go @@ -1,4 +1,4 @@ -package provider +package resource import ( "bytes" @@ -16,29 +16,30 @@ var ( dirMode os.FileMode = 0o755 ) -type parser[V any] func([]byte) (V, error) +type Parser[V any] func([]byte) (V, error) -type fetcher[V any] struct { - name string - vehicle types.Vehicle - updatedAt *time.Time - ticker *time.Ticker - done chan struct{} - hash [16]byte - parser parser[V] - interval time.Duration - onUpdate func(V) +type Fetcher[V any] struct { + resourceType string + name string + vehicle types.Vehicle + UpdatedAt *time.Time + ticker *time.Ticker + done chan struct{} + hash [16]byte + parser Parser[V] + interval time.Duration + OnUpdate func(V) } -func (f *fetcher[V]) Name() string { +func (f *Fetcher[V]) Name() string { return f.name } -func (f *fetcher[V]) VehicleType() types.VehicleType { +func (f *Fetcher[V]) VehicleType() types.VehicleType { return f.vehicle.Type() } -func (f *fetcher[V]) Initial() (V, error) { +func (f *Fetcher[V]) Initial() (V, error) { var ( buf []byte err error @@ -49,7 +50,7 @@ func (f *fetcher[V]) Initial() (V, error) { if stat, fErr := os.Stat(f.vehicle.Path()); fErr == nil { buf, err = os.ReadFile(f.vehicle.Path()) modTime := stat.ModTime() - f.updatedAt = &modTime + f.UpdatedAt = &modTime isLocal = true if f.interval != 0 && modTime.Add(f.interval).Before(time.Now()) { log.Infoln("[Provider] %s not updated for a long time, force refresh", f.Name()) @@ -63,11 +64,11 @@ func (f *fetcher[V]) Initial() (V, error) { return getZero[V](), err } - var proxies V + var contents V if forceUpdate { var forceBuf []byte if forceBuf, err = f.vehicle.Read(); err == nil { - if proxies, err = f.parser(forceBuf); err == nil { + if contents, err = f.parser(forceBuf); err == nil { isLocal = false buf = forceBuf } @@ -75,7 +76,7 @@ func (f *fetcher[V]) Initial() (V, error) { } if err != nil || !forceUpdate { - proxies, err = f.parser(buf) + contents, err = f.parser(buf) } if err != nil { @@ -89,7 +90,7 @@ func (f *fetcher[V]) Initial() (V, error) { return getZero[V](), err } - proxies, err = f.parser(buf) + contents, err = f.parser(buf) if err != nil { return getZero[V](), err } @@ -105,15 +106,15 @@ func (f *fetcher[V]) Initial() (V, error) { f.hash = md5.Sum(buf) - // pull proxies automatically + // pull contents automatically if f.ticker != nil { go f.pullLoop() } - return proxies, nil + return contents, nil } -func (f *fetcher[V]) Update() (V, bool, error) { +func (f *Fetcher[V]) Update() (V, bool, error) { buf, err := f.vehicle.Read() if err != nil { return getZero[V](), false, err @@ -122,12 +123,12 @@ func (f *fetcher[V]) Update() (V, bool, error) { now := time.Now() hash := md5.Sum(buf) if bytes.Equal(f.hash[:], hash[:]) { - f.updatedAt = &now - os.Chtimes(f.vehicle.Path(), now, now) + f.UpdatedAt = &now + _ = os.Chtimes(f.vehicle.Path(), now, now) return getZero[V](), true, nil } - proxies, err := f.parser(buf) + contents, err := f.parser(buf) if err != nil { return getZero[V](), false, err } @@ -138,20 +139,20 @@ func (f *fetcher[V]) Update() (V, bool, error) { } } - f.updatedAt = &now + f.UpdatedAt = &now f.hash = hash - return proxies, false, nil + return contents, false, nil } -func (f *fetcher[V]) Destroy() error { +func (f *Fetcher[V]) Destroy() error { if f.ticker != nil { f.done <- struct{}{} } return nil } -func (f *fetcher[V]) pullLoop() { +func (f *Fetcher[V]) pullLoop() { for { select { case <-f.ticker.C: @@ -162,13 +163,13 @@ func (f *fetcher[V]) pullLoop() { } if same { - log.Debugln("[Provider] %s's proxies doesn't change", f.Name()) + log.Debugln("[Provider] %s's content doesn't change", f.Name()) continue } - log.Infoln("[Provider] %s's proxies update", f.Name()) - if f.onUpdate != nil { - f.onUpdate(elm) + log.Infoln("[Provider] %s's content update", f.Name()) + if f.OnUpdate != nil { + f.OnUpdate(elm) } case <-f.done: f.ticker.Stop() @@ -189,19 +190,19 @@ func safeWrite(path string, buf []byte) error { return os.WriteFile(path, buf, fileMode) } -func newFetcher[V any](name string, interval time.Duration, vehicle types.Vehicle, parser parser[V], onUpdate func(V)) *fetcher[V] { +func NewFetcher[V any](name string, interval time.Duration, vehicle types.Vehicle, parser Parser[V], onUpdate func(V)) *Fetcher[V] { var ticker *time.Ticker if interval != 0 { ticker = time.NewTicker(interval) } - return &fetcher[V]{ + return &Fetcher[V]{ name: name, ticker: ticker, vehicle: vehicle, parser: parser, done: make(chan struct{}, 1), - onUpdate: onUpdate, + OnUpdate: onUpdate, interval: interval, } } diff --git a/adapter/provider/vehicle.go b/component/resource/vehicle.go similarity index 98% rename from adapter/provider/vehicle.go rename to component/resource/vehicle.go index 1eb5df83..c6e92e52 100644 --- a/adapter/provider/vehicle.go +++ b/component/resource/vehicle.go @@ -1,4 +1,4 @@ -package provider +package resource import ( "context" diff --git a/rules/provider/fetcher.go b/rules/provider/fetcher.go deleted file mode 100644 index b542a8e2..00000000 --- a/rules/provider/fetcher.go +++ /dev/null @@ -1,219 +0,0 @@ -package provider - -import ( - "bytes" - "crypto/md5" - P "github.com/Dreamacro/clash/constant/provider" - "github.com/Dreamacro/clash/log" - "io/ioutil" - "os" - "path/filepath" - "time" -) - -var ( - fileMode os.FileMode = 0666 - dirMode os.FileMode = 0755 -) - -type parser = func([]byte) (interface{}, error) - -type fetcher struct { - name string - vehicle P.Vehicle - updatedAt *time.Time - ticker *time.Ticker - done chan struct{} - hash [16]byte - parser parser - onUpdate func(interface{}) error - interval time.Duration -} - -func (f *fetcher) Name() string { - return f.name -} - -func (f *fetcher) VehicleType() P.VehicleType { - return f.vehicle.Type() -} - -func (f *fetcher) Initial() (interface{}, error) { - var ( - buf []byte - hasLocal bool - err error - forceUpdate bool - ) - - defer func() { - if f.ticker != nil { - go f.pullLoop() - } - }() - - if stat, fErr := os.Stat(f.vehicle.Path()); fErr == nil { - buf, err = ioutil.ReadFile(f.vehicle.Path()) - modTime := stat.ModTime() - f.updatedAt = &modTime - hasLocal = true - if f.interval != 0 && modTime.Add(f.interval).Before(time.Now()) { - forceUpdate = true - log.Infoln("[Provider] %s not updated for a long time, force refresh", f.Name()) - } - } else { - buf, err = f.vehicle.Read() - } - - if err != nil { - return nil, err - } - - var rules interface{} - if forceUpdate { - var forceBuf []byte - if forceBuf, err = f.vehicle.Read(); err == nil { - if rules, err = f.parser(forceBuf); err == nil { - hasLocal = false - buf = forceBuf - } - } - } - - if err != nil || !forceUpdate { - rules, err = f.parser(buf) - } - - if err != nil { - if !hasLocal { - return nil, err - } - - buf, err = f.vehicle.Read() - if err != nil { - return nil, err - } - - rules, err = f.parser(buf) - if err != nil { - return nil, err - } - - hasLocal = false - } - - if f.vehicle.Type() != P.File && !hasLocal { - if err := safeWrite(f.vehicle.Path(), buf); err != nil { - return nil, err - } - } - - f.hash = md5.Sum(buf) - - return rules, nil -} - -func (f *fetcher) Update() (interface{}, bool, error) { - buf, err := f.vehicle.Read() - if err != nil { - return nil, false, err - } - - now := time.Now() - hash := md5.Sum(buf) - if bytes.Equal(f.hash[:], hash[:]) { - f.updatedAt = &now - os.Chtimes(f.vehicle.Path(), now, now) - return nil, true, nil - } - - rules, err := f.parser(buf) - if err != nil { - return nil, false, err - } - - if f.vehicle.Type() != P.File { - if err := safeWrite(f.vehicle.Path(), buf); err != nil { - return nil, false, err - } - } - - f.updatedAt = &now - f.hash = hash - - return rules, false, nil -} - -func (f *fetcher) Destroy() error { - if f.ticker != nil { - f.done <- struct{}{} - } - return nil -} - -func newFetcher(name string, interval time.Duration, vehicle P.Vehicle, parser parser, onUpdate func(interface{}) error) *fetcher { - var ticker *time.Ticker - if interval != 0 { - ticker = time.NewTicker(interval) - } - - return &fetcher{ - name: name, - ticker: ticker, - vehicle: vehicle, - parser: parser, - done: make(chan struct{}, 1), - onUpdate: onUpdate, - interval: interval, - } -} - -func safeWrite(path string, buf []byte) error { - dir := filepath.Dir(path) - - if _, err := os.Stat(dir); os.IsNotExist(err) { - if err := os.MkdirAll(dir, dirMode); err != nil { - return err - } - } - - return ioutil.WriteFile(path, buf, fileMode) -} - -func (f *fetcher) pullLoop() { - for { - select { - case <-f.ticker.C: - same, err := f.update() - if same || err != nil { - continue - } - case <-f.done: - f.ticker.Stop() - return - } - } -} - -func (f *fetcher) update() (same bool, err error) { - elm, same, err := f.Update() - if err != nil { - log.Warnln("[Provider] %s pull error: %s", f.Name(), err.Error()) - return - } - - if same { - log.Debugln("[Provider] %s's rules doesn't change", f.Name()) - return - } - - log.Infoln("[Provider] %s's rules update", f.Name()) - if f.onUpdate != nil { - err := f.onUpdate(elm) - if err != nil { - log.Infoln("[Provider] %s update failed", f.Name()) - } - } - - return -} diff --git a/rules/provider/parse.go b/rules/provider/parse.go index de366802..80311af0 100644 --- a/rules/provider/parse.go +++ b/rules/provider/parse.go @@ -2,8 +2,8 @@ package provider import ( "fmt" - "github.com/Dreamacro/clash/adapter/provider" "github.com/Dreamacro/clash/common/structure" + "github.com/Dreamacro/clash/component/resource" C "github.com/Dreamacro/clash/constant" P "github.com/Dreamacro/clash/constant/provider" "time" @@ -40,9 +40,9 @@ func ParseRuleProvider(name string, mapping map[string]interface{}, parse func(t var vehicle P.Vehicle switch schema.Type { case "file": - vehicle = provider.NewFileVehicle(path) + vehicle = resource.NewFileVehicle(path) case "http": - vehicle = provider.NewHTTPVehicle(schema.URL, path) + vehicle = resource.NewHTTPVehicle(schema.URL, path) default: return nil, fmt.Errorf("unsupported vehicle type: %s", schema.Type) } diff --git a/rules/provider/provider.go b/rules/provider/provider.go index 32f57b5b..ce96c04f 100644 --- a/rules/provider/provider.go +++ b/rules/provider/provider.go @@ -2,6 +2,7 @@ package provider import ( "encoding/json" + "github.com/Dreamacro/clash/component/resource" C "github.com/Dreamacro/clash/constant" P "github.com/Dreamacro/clash/constant/provider" "gopkg.in/yaml.v3" @@ -14,7 +15,7 @@ var ( ) type ruleSetProvider struct { - *fetcher + *resource.Fetcher[any] behavior P.RuleType strategy ruleStrategy } @@ -54,18 +55,20 @@ func (rp *ruleSetProvider) Type() P.ProviderType { } func (rp *ruleSetProvider) Initial() error { - elm, err := rp.fetcher.Initial() + elm, err := rp.Fetcher.Initial() if err != nil { return err } - return rp.fetcher.onUpdate(elm) + rp.OnUpdate(elm) + return nil } func (rp *ruleSetProvider) Update() error { - elm, same, err := rp.fetcher.Update() + elm, same, err := rp.Fetcher.Update() if err == nil && !same { - return rp.fetcher.onUpdate(elm) + rp.OnUpdate(elm) + return nil } return err @@ -94,7 +97,7 @@ func (rp *ruleSetProvider) MarshalJSON() ([]byte, error) { "name": rp.Name(), "ruleCount": rp.strategy.Count(), "type": rp.Type().String(), - "updatedAt": rp.updatedAt, + "updatedAt": rp.UpdatedAt, "vehicleType": rp.VehicleType().String(), }) } @@ -105,21 +108,20 @@ func NewRuleSetProvider(name string, behavior P.RuleType, interval time.Duration behavior: behavior, } - onUpdate := func(elm interface{}) error { + onUpdate := func(elm interface{}) { rulesRaw := elm.([]string) rp.strategy.OnUpdate(rulesRaw) - return nil } - fetcher := newFetcher(name, interval, vehicle, rulesParse, onUpdate) - rp.fetcher = fetcher + fetcher := resource.NewFetcher(name, interval, vehicle, rulesParse, onUpdate) + rp.Fetcher = fetcher rp.strategy = newStrategy(behavior, parse) wrapper := &RuleSetProvider{ rp, } - final := func(provider *RuleSetProvider) { rp.fetcher.Destroy() } + final := func(provider *RuleSetProvider) { _ = rp.Fetcher.Destroy() } runtime.SetFinalizer(wrapper, final) return wrapper } @@ -140,7 +142,7 @@ func newStrategy(behavior P.RuleType, parse func(tp, payload, target string, par } } -func rulesParse(buf []byte) (interface{}, error) { +func rulesParse(buf []byte) (any, error) { rulePayload := RulePayload{} err := yaml.Unmarshal(buf, &rulePayload) if err != nil {