clash/tunnel/tunnel.go

162 lines
3.3 KiB
Go
Raw Normal View History

2018-06-10 14:50:03 +00:00
package tunnel
import (
"sync"
2018-06-16 13:34:13 +00:00
"time"
2018-06-10 14:50:03 +00:00
LocalAdapter "github.com/Dreamacro/clash/adapters/local"
cfg "github.com/Dreamacro/clash/config"
2018-06-10 14:50:03 +00:00
C "github.com/Dreamacro/clash/constant"
"github.com/Dreamacro/clash/observable"
"gopkg.in/eapache/channels.v1"
)
var (
tunnel *Tunnel
once sync.Once
)
// Tunnel handle proxy socket and HTTP/SOCKS socket
2018-06-10 14:50:03 +00:00
type Tunnel struct {
queue *channels.InfiniteChannel
rules []C.Rule
2018-07-18 13:50:16 +00:00
proxies map[string]C.Proxy
2018-06-14 16:49:52 +00:00
configLock *sync.RWMutex
2018-06-17 14:41:32 +00:00
traffic *C.Traffic
// Outbound Rule
mode cfg.Mode
// Log
logCh chan interface{}
observable *observable.Observable
logLevel C.LogLevel
2018-06-10 14:50:03 +00:00
}
// Add request to queue
2018-06-10 14:50:03 +00:00
func (t *Tunnel) Add(req C.ServerAdapter) {
t.queue.In() <- req
}
// Traffic return traffic of all connections
2018-06-18 03:31:49 +00:00
func (t *Tunnel) Traffic() *C.Traffic {
return t.traffic
}
// Log return clash log stream
2018-06-18 03:31:49 +00:00
func (t *Tunnel) Log() *observable.Observable {
return t.observable
}
func (t *Tunnel) configMonitor(signal chan<- struct{}) {
sub := cfg.Instance().Subscribe()
signal <- struct{}{}
for elm := range sub {
event := elm.(*cfg.Event)
switch event.Type {
case "proxies":
proxies := event.Payload.(map[string]C.Proxy)
t.configLock.Lock()
t.proxies = proxies
t.configLock.Unlock()
case "rules":
rules := event.Payload.([]C.Rule)
t.configLock.Lock()
t.rules = rules
t.configLock.Unlock()
case "mode":
t.mode = event.Payload.(cfg.Mode)
case "log-level":
t.logLevel = event.Payload.(C.LogLevel)
2018-06-10 14:50:03 +00:00
}
}
}
func (t *Tunnel) process() {
queue := t.queue.Out()
for {
elm := <-queue
conn := elm.(C.ServerAdapter)
go t.handleConn(conn)
}
}
func (t *Tunnel) handleConn(localConn C.ServerAdapter) {
defer localConn.Close()
addr := localConn.Addr()
var proxy C.Proxy
switch t.mode {
case cfg.Direct:
2018-07-18 13:50:16 +00:00
proxy = t.proxies["DIRECT"]
case cfg.Global:
proxy = t.proxies["GLOBAL"]
// Rule
default:
proxy = t.match(addr)
}
2018-06-10 14:50:03 +00:00
remoConn, err := proxy.Generator(addr)
if err != nil {
t.logCh <- newLog(C.WARNING, "Proxy connect error: %s", err.Error())
2018-06-10 14:50:03 +00:00
return
}
defer remoConn.Close()
switch adapter := localConn.(type) {
2018-08-12 08:18:58 +00:00
case *LocalAdapter.HTTPAdapter:
t.handleHTTP(adapter, remoConn)
2018-08-26 16:06:40 +00:00
case *LocalAdapter.SocketAdapter:
t.handleSOCKS(adapter, remoConn)
}
2018-06-10 14:50:03 +00:00
}
func (t *Tunnel) match(addr *C.Addr) C.Proxy {
2018-06-14 16:49:52 +00:00
t.configLock.RLock()
defer t.configLock.RUnlock()
2018-06-10 14:50:03 +00:00
for _, rule := range t.rules {
if rule.IsMatch(addr) {
2018-07-18 13:50:16 +00:00
a, ok := t.proxies[rule.Adapter()]
2018-06-10 14:50:03 +00:00
if !ok {
continue
}
t.logCh <- newLog(C.INFO, "%v match %s using %s", addr.String(), rule.RuleType().String(), rule.Adapter())
2018-06-10 14:50:03 +00:00
return a
}
}
t.logCh <- newLog(C.INFO, "%v doesn't match any rule using DIRECT", addr.String())
2018-07-18 13:50:16 +00:00
return t.proxies["DIRECT"]
2018-06-10 14:50:03 +00:00
}
// Run initial task
func (t *Tunnel) Run() {
go t.process()
go t.subscribeLogs()
signal := make(chan struct{})
go t.configMonitor(signal)
<-signal
}
2018-06-10 14:50:03 +00:00
func newTunnel() *Tunnel {
logCh := make(chan interface{})
return &Tunnel{
2018-06-10 14:50:03 +00:00
queue: channels.NewInfiniteChannel(),
2018-07-18 13:50:16 +00:00
proxies: make(map[string]C.Proxy),
2018-06-10 14:50:03 +00:00
observable: observable.NewObservable(logCh),
logCh: logCh,
2018-06-14 16:49:52 +00:00
configLock: &sync.RWMutex{},
2018-06-17 14:41:32 +00:00
traffic: C.NewTraffic(time.Second),
mode: cfg.Rule,
logLevel: C.INFO,
2018-06-10 14:50:03 +00:00
}
}
// Instance return singleton instance of Tunnel
func Instance() *Tunnel {
2018-06-10 14:50:03 +00:00
once.Do(func() {
tunnel = newTunnel()
})
return tunnel
}