clash/adapter/outboundgroup/loadbalance.go

241 lines
5.2 KiB
Go
Raw Normal View History

2019-12-08 04:17:24 +00:00
package outboundgroup
2019-02-15 06:25:20 +00:00
import (
2019-07-02 11:18:03 +00:00
"context"
2019-02-15 06:25:20 +00:00
"encoding/json"
"errors"
"fmt"
"math/rand"
2019-02-15 06:25:20 +00:00
"net"
"time"
2019-02-15 06:25:20 +00:00
2021-06-10 06:05:56 +00:00
"github.com/Dreamacro/clash/adapter/outbound"
2019-02-15 06:25:20 +00:00
"github.com/Dreamacro/clash/common/murmur3"
"github.com/Dreamacro/clash/component/dialer"
2019-02-15 06:25:20 +00:00
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/constant/provider"
2019-02-15 06:25:20 +00:00
"golang.org/x/net/publicsuffix"
)
type strategyFn = func(proxies []C.Proxy, metadata *C.Metadata) C.Proxy
2019-02-15 06:25:20 +00:00
type LoadBalance struct {
*GroupBase
disableUDP bool
strategyFn strategyFn
}
var errStrategy = errors.New("unsupported strategy")
2022-03-16 04:10:13 +00:00
func parseStrategy(config map[string]any) string {
if elm, ok := config["strategy"]; ok {
if strategy, ok := elm.(string); ok {
return strategy
}
}
return "consistent-hashing"
2019-02-15 06:25:20 +00:00
}
func getKey(metadata *C.Metadata) string {
2022-05-04 11:52:48 +00:00
if metadata == nil {
return ""
}
2019-02-15 06:25:20 +00:00
if metadata.Host != "" {
// ip host
if ip := net.ParseIP(metadata.Host); ip != nil {
return metadata.Host
}
if etld, err := publicsuffix.EffectiveTLDPlusOne(metadata.Host); err == nil {
return etld
}
}
2022-04-19 17:52:51 +00:00
if !metadata.DstIP.IsValid() {
2019-02-15 06:25:20 +00:00
return ""
}
2019-05-09 13:00:29 +00:00
return metadata.DstIP.String()
2019-02-15 06:25:20 +00:00
}
func jumpHash(key uint64, buckets int32) int32 {
var b, j int64
for j < int64(buckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}
return int32(b)
}
2021-04-29 03:23:14 +00:00
// DialContext implements C.ProxyAdapter
func (lb *LoadBalance) DialContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (c C.Conn, err error) {
defer func() {
if err == nil {
c.AppendToChains(lb)
lb.onDialSuccess()
} else {
lb.onDialFailed()
}
}()
proxy := lb.Unwrap(metadata)
c, err = proxy.DialContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
return
2019-02-15 06:25:20 +00:00
}
// ListenPacketContext implements C.ProxyAdapter
func (lb *LoadBalance) ListenPacketContext(ctx context.Context, metadata *C.Metadata, opts ...dialer.Option) (pc C.PacketConn, err error) {
defer func() {
if err == nil {
pc.AppendToChains(lb)
}
}()
proxy := lb.Unwrap(metadata)
return proxy.ListenPacketContext(ctx, metadata, lb.Base.DialOptions(opts...)...)
}
2021-04-29 03:23:14 +00:00
// SupportUDP implements C.ProxyAdapter
func (lb *LoadBalance) SupportUDP() bool {
return !lb.disableUDP
}
func strategyRoundRobin() strategyFn {
idx := 0
return func(proxies []C.Proxy, metadata *C.Metadata) C.Proxy {
length := len(proxies)
for i := 0; i < length; i++ {
idx = (idx + 1) % length
proxy := proxies[idx]
if proxy.Alive() {
return proxy
}
2019-04-23 15:29:36 +00:00
}
return proxies[0]
2019-04-23 15:29:36 +00:00
}
}
2019-04-23 15:29:36 +00:00
func strategyConsistentHashing() strategyFn {
maxRetry := 5
return func(proxies []C.Proxy, metadata *C.Metadata) C.Proxy {
key := uint64(murmur3.Sum32([]byte(getKey(metadata))))
buckets := int32(len(proxies))
for i := 0; i < maxRetry; i, key = i+1, key+1 {
idx := jumpHash(key, buckets)
proxy := proxies[idx]
if proxy.Alive() {
return proxy
}
}
return proxies[0]
}
}
func strategyStickySessions() strategyFn {
timeout := int64(600)
type Session struct {
idx int
time time.Time
}
Sessions := make(map[string]map[string]Session)
go func() {
for {
time.Sleep(time.Second * 60)
now := time.Now().Unix()
for _, subMap := range Sessions {
for dest, session := range subMap {
if now-session.time.Unix() > timeout {
delete(subMap, dest)
}
}
}
}
}()
return func(proxies []C.Proxy, metadata *C.Metadata) C.Proxy {
src := metadata.SrcIP.String()
dest := getKey(metadata)
now := time.Now()
length := len(proxies)
if Sessions[src] == nil {
Sessions[src] = make(map[string]Session)
}
session, ok := Sessions[src][dest]
if !ok || now.Unix()-session.time.Unix() > timeout {
session.idx = rand.Intn(length)
}
session.time = now
var i int
var res = proxies[0]
for i := 0; i < length; i++ {
idx := (session.idx + i) % length
proxy := proxies[idx]
if proxy.Alive() {
session.idx = idx
res = proxy
break
}
}
if i == length {
session.idx = 0
res = proxies[0]
}
Sessions[src][dest] = session
return res
}
}
2021-04-29 03:23:14 +00:00
// Unwrap implements C.ProxyAdapter
func (lb *LoadBalance) Unwrap(metadata *C.Metadata) C.Proxy {
proxies := lb.GetProxies(true)
return lb.strategyFn(proxies, metadata)
2019-04-23 15:29:36 +00:00
}
2021-04-29 03:23:14 +00:00
// MarshalJSON implements C.ProxyAdapter
2019-02-15 06:25:20 +00:00
func (lb *LoadBalance) MarshalJSON() ([]byte, error) {
var all []string
for _, proxy := range lb.GetProxies(false) {
2019-02-15 06:25:20 +00:00
all = append(all, proxy.Name())
}
2022-03-16 04:10:13 +00:00
return json.Marshal(map[string]any{
2019-02-15 06:25:20 +00:00
"type": lb.Type().String(),
"all": all,
})
}
func NewLoadBalance(option *GroupCommonOption, providers []provider.ProxyProvider, strategy string) (lb *LoadBalance, err error) {
var strategyFn strategyFn
switch strategy {
case "consistent-hashing":
strategyFn = strategyConsistentHashing()
case "round-robin":
strategyFn = strategyRoundRobin()
case "sticky-sessions":
strategyFn = strategyStickySessions()
default:
return nil, fmt.Errorf("%w: %s", errStrategy, strategy)
}
return &LoadBalance{
GroupBase: NewGroupBase(GroupBaseOption{
outbound.BaseOption{
Name: option.Name,
Type: C.LoadBalance,
Interface: option.Interface,
RoutingMark: option.RoutingMark,
},
option.Filter,
providers,
}),
strategyFn: strategyFn,
disableUDP: option.DisableUDP,
}, nil
2019-02-15 06:25:20 +00:00
}