Browse Source

fixed race

master
Loki Verloren 7 months ago
parent
commit
efb6fb9d1f
6 changed files with 238 additions and 234 deletions
  1. 55
    55
      conn.go
  2. 1
    1
      example/client/client.go
  3. 1
    1
      example/rudp/example.go
  4. 1
    1
      example/server/server.go
  5. 28
    28
      listener.go
  6. 152
    148
      rudp.go

+ 55
- 55
conn.go View File

@@ -44,113 +44,113 @@ type RudpConn struct {
44 44
 	in         chan []byte
45 45
 }
46 46
 
47
-func (this *RudpConn) SetDeadline(t time.Time) error      { return nil }
48
-func (this *RudpConn) SetReadDeadline(t time.Time) error  { return nil }
49
-func (this *RudpConn) SetWriteDeadline(t time.Time) error { return nil }
50
-func (this *RudpConn) LocalAddr() net.Addr                { return this.conn.LocalAddr() }
51
-func (this *RudpConn) Connected() bool                    { return this.remoteAddr == nil }
52
-func (this *RudpConn) RemoteAddr() net.Addr {
53
-	if this.remoteAddr != nil {
54
-		return this.remoteAddr
47
+func (rc *RudpConn) SetDeadline(t time.Time) error      { return nil }
48
+func (rc *RudpConn) SetReadDeadline(t time.Time) error  { return nil }
49
+func (rc *RudpConn) SetWriteDeadline(t time.Time) error { return nil }
50
+func (rc *RudpConn) LocalAddr() net.Addr                { return rc.conn.LocalAddr() }
51
+func (rc *RudpConn) Connected() bool                    { return rc.remoteAddr == nil }
52
+func (rc *RudpConn) RemoteAddr() net.Addr {
53
+	if rc.remoteAddr != nil {
54
+		return rc.remoteAddr
55 55
 	}
56
-	return this.conn.RemoteAddr()
56
+	return rc.conn.RemoteAddr()
57 57
 }
58
-func (this *RudpConn) Close() error {
58
+func (rc *RudpConn) Close() error {
59 59
 	var err error
60
-	if this.remoteAddr != nil {
61
-		if this.closef != nil {
62
-			this.closef(this.remoteAddr.String())
60
+	if rc.remoteAddr != nil {
61
+		if rc.closef != nil {
62
+			rc.closef(rc.remoteAddr.String())
63 63
 		}
64
-		_, err = this.conn.WriteToUDP([]byte{TYPE_CORRUPT}, this.remoteAddr)
65
-		this.in <- []byte{TYPE_EOF}
64
+		_, err = rc.conn.WriteToUDP([]byte{TYPE_CORRUPT}, rc.remoteAddr)
65
+		rc.in <- []byte{TYPE_EOF}
66 66
 	} else {
67
-		_, err = this.conn.Write([]byte{TYPE_CORRUPT})
67
+		_, err = rc.conn.Write([]byte{TYPE_CORRUPT})
68 68
 	}
69 69
 	checkErr(err)
70 70
 	return err
71 71
 }
72
-func (this *RudpConn) Read(bts []byte) (n int, err error) {
72
+func (rc *RudpConn) Read(bts []byte) (n int, err error) {
73 73
 	select {
74
-	case data := <-this.recvChan:
74
+	case data := <-rc.recvChan:
75 75
 		copy(bts, data)
76 76
 		return len(data), nil
77
-	case err := <-this.recvErr:
77
+	case err := <-rc.recvErr:
78 78
 		return 0, err
79 79
 	}
80 80
 }
81 81
 
82
-func (this *RudpConn) send(bts []byte) (err error) {
82
+func (r *RudpConn) send(bts []byte) (err error) {
83 83
 	select {
84
-	case this.sendChan <- bts:
84
+	case r.sendChan <- bts:
85 85
 		return nil
86
-	case err := <-this.sendErr:
86
+	case err := <-r.sendErr:
87 87
 		return err
88 88
 	}
89 89
 }
90
-func (this *RudpConn) Write(bts []byte) (n int, err error) {
90
+func (r *RudpConn) Write(bts []byte) (n int, err error) {
91 91
 	sz := len(bts)
92 92
 	for len(bts)+MAX_MSG_HEAD > GENERAL_PACKAGE {
93
-		if err := this.send(bts[:GENERAL_PACKAGE-MAX_MSG_HEAD]); err != nil {
93
+		if err := r.send(bts[:GENERAL_PACKAGE-MAX_MSG_HEAD]); err != nil {
94 94
 			return 0, err
95 95
 		}
96 96
 		bts = bts[GENERAL_PACKAGE-MAX_MSG_HEAD:]
97 97
 	}
98
-	return sz, this.send(bts)
98
+	return sz, r.send(bts)
99 99
 }
100 100
 
101
-func (this *RudpConn) rudpRecv(data []byte) error {
101
+func (r *RudpConn) rudpRecv(data []byte) error {
102 102
 	for {
103
-		n, err := this.rudp.Recv(data)
103
+		n, err := r.rudp.Recv(data)
104 104
 		if err != nil {
105
-			this.recvErr <- err
105
+			r.recvErr <- err
106 106
 			return err
107 107
 		} else if n == 0 {
108 108
 			break
109 109
 		}
110 110
 		bts := make([]byte, n)
111 111
 		copy(bts, data[:n])
112
-		this.recvChan <- bts
112
+		r.recvChan <- bts
113 113
 	}
114 114
 	return nil
115 115
 }
116
-func (this *RudpConn) conectedRecvLoop() {
116
+func (r *RudpConn) conectedRecvLoop() {
117 117
 	data := make([]byte, MAX_PACKAGE)
118 118
 	for {
119
-		n, err := this.conn.Read(data)
119
+		n, err := r.conn.Read(data)
120 120
 		if err != nil {
121
-			this.recvErr <- err
121
+			r.recvErr <- err
122 122
 			return
123 123
 		}
124
-		this.rudp.Input(data[:n])
125
-		if this.rudpRecv(data) != nil {
124
+		r.rudp.Input(data[:n])
125
+		if r.rudpRecv(data) != nil {
126 126
 			return
127 127
 		}
128 128
 	}
129 129
 }
130
-func (this *RudpConn) unconectedRecvLoop() {
130
+func (r *RudpConn) unconectedRecvLoop() {
131 131
 	data := make([]byte, MAX_PACKAGE)
132 132
 	for {
133 133
 		select {
134
-		case bts := <-this.in:
135
-			this.rudp.Input(bts)
136
-			if this.rudpRecv(data) != nil {
134
+		case bts := <-r.in:
135
+			r.rudp.Input(bts)
136
+			if r.rudpRecv(data) != nil {
137 137
 				return
138 138
 			}
139 139
 		}
140 140
 	}
141 141
 }
142
-func (this *RudpConn) sendLoop() {
142
+func (r *RudpConn) sendLoop() {
143 143
 	var sendNum int
144 144
 	for {
145 145
 		select {
146
-		case tick := <-this.SendTick:
146
+		case tick := <-r.SendTick:
147 147
 		sendOut:
148 148
 			for {
149 149
 				select {
150
-				case bts := <-this.sendChan:
151
-					_, err := this.rudp.Send(bts)
150
+				case bts := <-r.sendChan:
151
+					_, err := r.rudp.Send(bts)
152 152
 					if err != nil {
153
-						this.sendErr <- err
153
+						r.sendErr <- err
154 154
 						return
155 155
 					}
156 156
 					sendNum++
@@ -162,17 +162,17 @@ func (this *RudpConn) sendLoop() {
162 162
 				}
163 163
 			}
164 164
 			sendNum = 0
165
-			p := this.rudp.Update(tick)
165
+			p := r.rudp.Update(tick)
166 166
 			var num, sz int
167 167
 			for p != nil {
168 168
 				n, err := int(0), error(nil)
169
-				if this.Connected() {
170
-					n, err = this.conn.Write(p.Bts)
169
+				if r.Connected() {
170
+					n, err = r.conn.Write(p.Bts)
171 171
 				} else {
172
-					n, err = this.conn.WriteToUDP(p.Bts, this.remoteAddr)
172
+					n, err = r.conn.WriteToUDP(p.Bts, r.remoteAddr)
173 173
 				}
174 174
 				if err != nil {
175
-					this.sendErr <- err
175
+					r.sendErr <- err
176 176
 					return
177 177
 				}
178 178
 				sz, num = sz+n, num+1
@@ -181,29 +181,29 @@ func (this *RudpConn) sendLoop() {
181 181
 			if num > 1 {
182 182
 				show := bitShow(sz * int(time.Second/sendTick))
183 183
 				dbg("send package num %v,sz %v, %v/s,local %v,remote %v",
184
-					num, show, show, this.LocalAddr(), this.RemoteAddr())
184
+					num, show, show, r.LocalAddr(), r.RemoteAddr())
185 185
 			}
186 186
 		}
187 187
 	}
188 188
 }
189
-func (this *RudpConn) run() {
189
+func (r *RudpConn) run() {
190 190
 	if autoSend && sendTick > 0 {
191 191
 		go func() {
192 192
 			tick := time.Tick(sendTick)
193 193
 			for {
194 194
 				select {
195 195
 				case <-tick:
196
-					this.SendTick <- 1
196
+					r.SendTick <- 1
197 197
 				}
198 198
 			}
199 199
 		}()
200 200
 	}
201 201
 	go func() {
202
-		if this.Connected() {
203
-			this.conectedRecvLoop()
202
+		if r.Connected() {
203
+			r.conectedRecvLoop()
204 204
 		} else {
205
-			this.unconectedRecvLoop()
205
+			r.unconectedRecvLoop()
206 206
 		}
207 207
 	}()
208
-	this.sendLoop()
208
+	r.sendLoop()
209 209
 }

+ 1
- 1
example/client/client.go View File

@@ -8,7 +8,7 @@ import (
8 8
 	"syscall"
9 9
 	"time"
10 10
 
11
-	"github.com/u35s/rudp"
11
+	"git.parallelcoin.io/dev/rudp"
12 12
 )
13 13
 
14 14
 func main() {

+ 1
- 1
example/rudp/example.go View File

@@ -3,7 +3,7 @@ package main
3 3
 import (
4 4
 	"fmt"
5 5
 
6
-	"github.com/u35s/rudp"
6
+	"git.parallelcoin.io/dev/rudp"
7 7
 )
8 8
 
9 9
 var dumpIdx int

+ 1
- 1
example/server/server.go View File

@@ -8,7 +8,7 @@ import (
8 8
 	"os/signal"
9 9
 	"syscall"
10 10
 
11
-	"github.com/u35s/rudp"
11
+	"git.parallelcoin.io/dev/rudp"
12 12
 )
13 13
 
14 14
 func read(conn *rudp.RudpConn) {

+ 28
- 28
listener.go View File

@@ -24,53 +24,53 @@ type RudpListener struct {
24 24
 }
25 25
 
26 26
 //net listener interface
27
-func (this *RudpListener) Accept() (net.Conn, error) { return this.AcceptRudp() }
28
-func (this *RudpListener) Close() error {
29
-	this.CloseAllRudp()
30
-	return this.conn.Close()
27
+func (rl *RudpListener) Accept() (net.Conn, error) { return rl.AcceptRudp() }
28
+func (rl *RudpListener) Close() error {
29
+	rl.CloseAllRudp()
30
+	return rl.conn.Close()
31 31
 }
32
-func (this *RudpListener) Addr() net.Addr { return this.conn.LocalAddr() }
32
+func (rl *RudpListener) Addr() net.Addr { return rl.conn.LocalAddr() }
33 33
 
34
-func (this *RudpListener) CloseRudp(addr string) {
35
-	this.lock.Lock()
36
-	delete(this.rudpConnMap, addr)
37
-	this.lock.Unlock()
34
+func (rl *RudpListener) CloseRudp(addr string) {
35
+	rl.lock.Lock()
36
+	delete(rl.rudpConnMap, addr)
37
+	rl.lock.Unlock()
38 38
 }
39 39
 
40
-func (this *RudpListener) CloseAllRudp() {
41
-	this.lock.Lock()
42
-	for _, rconn := range this.rudpConnMap {
40
+func (rl *RudpListener) CloseAllRudp() {
41
+	rl.lock.Lock()
42
+	for _, rconn := range rl.rudpConnMap {
43 43
 		rconn.closef = nil
44 44
 		rconn.Close()
45 45
 	}
46
-	this.lock.Unlock()
46
+	rl.lock.Unlock()
47 47
 }
48
-func (this *RudpListener) AcceptRudp() (*RudpConn, error) {
48
+func (rl *RudpListener) AcceptRudp() (*RudpConn, error) {
49 49
 	select {
50
-	case c := <-this.newRudpConn:
50
+	case c := <-rl.newRudpConn:
51 51
 		return c, nil
52
-	case e := <-this.newRudpErr:
52
+	case e := <-rl.newRudpErr:
53 53
 		return nil, e
54 54
 	}
55 55
 }
56
-func (this *RudpListener) run() {
56
+func (rl *RudpListener) run() {
57 57
 	data := make([]byte, MAX_PACKAGE)
58 58
 	for {
59
-		n, remoteAddr, err := this.conn.ReadFromUDP(data)
59
+		n, remoteAddr, err := rl.conn.ReadFromUDP(data)
60 60
 		if err != nil {
61
-			this.CloseAllRudp()
62
-			this.newRudpErr <- err
61
+			rl.CloseAllRudp()
62
+			rl.newRudpErr <- err
63 63
 			return
64 64
 		}
65
-		this.lock.RLock()
66
-		rudpConn, ok := this.rudpConnMap[remoteAddr.String()]
67
-		this.lock.RUnlock()
65
+		rl.lock.RLock()
66
+		rudpConn, ok := rl.rudpConnMap[remoteAddr.String()]
67
+		rl.lock.RUnlock()
68 68
 		if !ok {
69
-			rudpConn = NewUnConn(this.conn, remoteAddr, New(), this.CloseRudp)
70
-			this.lock.Lock()
71
-			this.rudpConnMap[remoteAddr.String()] = rudpConn
72
-			this.lock.Unlock()
73
-			this.newRudpConn <- rudpConn
69
+			rudpConn = NewUnConn(rl.conn, remoteAddr, New(), rl.CloseRudp)
70
+			rl.lock.Lock()
71
+			rl.rudpConnMap[remoteAddr.String()] = rudpConn
72
+			rl.lock.Unlock()
73
+			rl.newRudpConn <- rudpConn
74 74
 		}
75 75
 		bts := make([]byte, n)
76 76
 		copy(bts, data[:n])

+ 152
- 148
rudp.go View File

@@ -4,6 +4,8 @@ import (
4 4
 	"bytes"
5 5
 	"errors"
6 6
 	"time"
7
+
8
+	"go.uber.org/atomic"
7 9
 )
8 10
 
9 11
 const (
@@ -21,18 +23,20 @@ const (
21 23
 	MAX_PACKAGE     = 0x7fff - TYPE_NORMAL
22 24
 )
23 25
 
24
-type Error int
26
+type Err struct {
27
+	atomic.Int32
28
+}
25 29
 
26 30
 const (
27
-	ERROR_NIL Error = iota
31
+	ERROR_NIL int32 = iota
28 32
 	ERROR_EOF
29 33
 	ERROR_REMOTE_EOF
30 34
 	ERROR_CORRUPT
31 35
 	ERROR_MSG_SIZE
32 36
 )
33 37
 
34
-func (this Error) Error() error {
35
-	switch this {
38
+func (e Err) Error() error {
39
+	switch e.Int32.Load() {
36 40
 	case ERROR_EOF:
37 41
 		return errors.New("EOF")
38 42
 	case ERROR_REMOTE_EOF:
@@ -58,47 +62,47 @@ type packageBuffer struct {
58 62
 	tail *Package
59 63
 }
60 64
 
61
-func (tmp *packageBuffer) packRequest(min, max int, tag int) {
62
-	if tmp.tmp.Len()+5 > GENERAL_PACKAGE {
63
-		tmp.newPackage()
64
-	}
65
-	tmp.tmp.WriteByte(byte(tag))
66
-	tmp.tmp.WriteByte(byte((min & 0xff00) >> 8))
67
-	tmp.tmp.WriteByte(byte(min & 0xff))
68
-	tmp.tmp.WriteByte(byte((max & 0xff00) >> 8))
69
-	tmp.tmp.WriteByte(byte(max & 0xff))
65
+func (pb *packageBuffer) packRequest(min, max int, tag int) {
66
+	if pb.tmp.Len()+5 > GENERAL_PACKAGE {
67
+		pb.newPackage()
68
+	}
69
+	pb.tmp.WriteByte(byte(tag))
70
+	pb.tmp.WriteByte(byte((min & 0xff00) >> 8))
71
+	pb.tmp.WriteByte(byte(min & 0xff))
72
+	pb.tmp.WriteByte(byte((max & 0xff00) >> 8))
73
+	pb.tmp.WriteByte(byte(max & 0xff))
70 74
 }
71
-func (tmp *packageBuffer) fillHeader(head, id int) {
75
+func (pb *packageBuffer) fillHeader(head, id int) {
72 76
 	if head < 128 {
73
-		tmp.tmp.WriteByte(byte(head))
77
+		pb.tmp.WriteByte(byte(head))
74 78
 	} else {
75
-		tmp.tmp.WriteByte(byte(((head & 0x7f00) >> 8) | 0x80))
76
-		tmp.tmp.WriteByte(byte(head & 0xff))
79
+		pb.tmp.WriteByte(byte(((head & 0x7f00) >> 8) | 0x80))
80
+		pb.tmp.WriteByte(byte(head & 0xff))
77 81
 	}
78
-	tmp.tmp.WriteByte(byte((id & 0xff00) >> 8))
79
-	tmp.tmp.WriteByte(byte(id & 0xff))
82
+	pb.tmp.WriteByte(byte((id & 0xff00) >> 8))
83
+	pb.tmp.WriteByte(byte(id & 0xff))
80 84
 }
81
-func (tmp *packageBuffer) packMessage(m *message) {
82
-	if m.buf.Len()+4+tmp.tmp.Len() >= GENERAL_PACKAGE {
83
-		tmp.newPackage()
85
+func (pb *packageBuffer) packMessage(m *message) {
86
+	if m.buf.Len()+4+pb.tmp.Len() >= GENERAL_PACKAGE {
87
+		pb.newPackage()
84 88
 	}
85
-	tmp.fillHeader(m.buf.Len()+TYPE_NORMAL, m.id)
86
-	tmp.tmp.Write(m.buf.Bytes())
89
+	pb.fillHeader(m.buf.Len()+TYPE_NORMAL, m.id)
90
+	pb.tmp.Write(m.buf.Bytes())
87 91
 }
88
-func (tmp *packageBuffer) newPackage() {
89
-	if tmp.tmp.Len() <= 0 {
92
+func (pb *packageBuffer) newPackage() {
93
+	if pb.tmp.Len() <= 0 {
90 94
 		return
91 95
 	}
92
-	p := &Package{Bts: make([]byte, tmp.tmp.Len())}
93
-	copy(p.Bts, tmp.tmp.Bytes())
94
-	tmp.tmp.Reset()
95
-	tmp.num++
96
-	if tmp.tail == nil {
97
-		tmp.head = p
98
-		tmp.tail = p
96
+	p := &Package{Bts: make([]byte, pb.tmp.Len())}
97
+	copy(p.Bts, pb.tmp.Bytes())
98
+	pb.tmp.Reset()
99
+	pb.num++
100
+	if pb.tail == nil {
101
+		pb.head = p
102
+		pb.tail = p
99 103
 	} else {
100
-		tmp.tail.Next = p
101
-		tmp.tail = p
104
+		pb.tail.Next = p
105
+		pb.tail = p
102 106
 	}
103 107
 }
104 108
 
@@ -118,7 +122,7 @@ type Rudp struct {
118 122
 	addSendAgain chan [2]int
119 123
 	sendID       int
120 124
 
121
-	corrupt Error
125
+	corrupt Err
122 126
 
123 127
 	currentTick       int
124 128
 	lastRecvTick      int
@@ -126,50 +130,50 @@ type Rudp struct {
126 130
 	lastSendDelayTick int
127 131
 }
128 132
 
129
-func (this *Rudp) Recv(bts []byte) (int, error) {
130
-	if err := this.corrupt; err != ERROR_NIL {
131
-		return 0, err.Error()
133
+func (r *Rudp) Recv(bts []byte) (int, error) {
134
+	if err := r.corrupt.Load(); err != ERROR_NIL {
135
+		return 0, r.corrupt.Error()
132 136
 	}
133
-	m := this.recvQueue.pop(this.recvIDMin)
137
+	m := r.recvQueue.pop(r.recvIDMin)
134 138
 	if m == nil {
135 139
 		return 0, nil
136 140
 	}
137
-	this.recvIDMin++
141
+	r.recvIDMin++
138 142
 	copy(bts, m.buf.Bytes())
139 143
 	return m.buf.Len(), nil
140 144
 }
141 145
 
142
-func (this *Rudp) Send(bts []byte) (n int, err error) {
143
-	if err := this.corrupt; err != ERROR_NIL {
144
-		return 0, err.Error()
146
+func (r *Rudp) Send(bts []byte) (n int, err error) {
147
+	if err := r.corrupt.Load(); err != ERROR_NIL {
148
+		return 0, r.corrupt.Error()
145 149
 	}
146 150
 	if len(bts) > MAX_PACKAGE {
147 151
 		return 0, nil
148 152
 	}
149 153
 	m := &message{}
150 154
 	m.buf.Write(bts)
151
-	m.id = this.sendID
152
-	this.sendID++
153
-	m.tick = this.currentTick
154
-	this.sendQueue.push(m)
155
+	m.id = r.sendID
156
+	r.sendID++
157
+	m.tick = r.currentTick
158
+	r.sendQueue.push(m)
155 159
 	return len(bts), nil
156 160
 }
157 161
 
158
-func (this *Rudp) Update(tick int) *Package {
159
-	if this.corrupt != ERROR_NIL {
162
+func (r *Rudp) Update(tick int) *Package {
163
+	if r.corrupt.Load() != ERROR_NIL {
160 164
 		return nil
161 165
 	}
162
-	this.currentTick += tick
163
-	if this.currentTick >= this.lastExpiredTick+expiredTick {
164
-		this.lastExpiredTick = this.currentTick
165
-		this.clearSendExpired()
166
+	r.currentTick += tick
167
+	if r.currentTick >= r.lastExpiredTick+expiredTick {
168
+		r.lastExpiredTick = r.currentTick
169
+		r.clearSendExpired()
166 170
 	}
167
-	if this.currentTick >= this.lastRecvTick+corruptTick {
168
-		this.corrupt = ERROR_CORRUPT
171
+	if r.currentTick >= r.lastRecvTick+corruptTick {
172
+		r.corrupt.Store(ERROR_CORRUPT)
169 173
 	}
170
-	if this.currentTick >= this.lastSendDelayTick+sendDelayTick {
171
-		this.lastSendDelayTick = this.currentTick
172
-		return this.outPut()
174
+	if r.currentTick >= r.lastSendDelayTick+sendDelayTick {
175
+		r.lastSendDelayTick = r.currentTick
176
+		return r.outPut()
173 177
 	}
174 178
 	return nil
175 179
 }
@@ -187,55 +191,55 @@ type messageQueue struct {
187 191
 	num  int
188 192
 }
189 193
 
190
-func (this *messageQueue) pop(id int) *message {
191
-	if this.head == nil {
194
+func (mq *messageQueue) pop(id int) *message {
195
+	if mq.head == nil {
192 196
 		return nil
193 197
 	}
194
-	m := this.head
198
+	m := mq.head
195 199
 	if id >= 0 && m.id != id {
196 200
 		return nil
197 201
 	}
198
-	this.head = m.next
202
+	mq.head = m.next
199 203
 	m.next = nil
200
-	if this.head == nil {
201
-		this.tail = nil
204
+	if mq.head == nil {
205
+		mq.tail = nil
202 206
 	}
203
-	this.num--
207
+	mq.num--
204 208
 	return m
205 209
 }
206 210
 
207
-func (this *messageQueue) push(m *message) {
208
-	if this.tail == nil {
209
-		this.head = m
210
-		this.tail = m
211
+func (mq *messageQueue) push(m *message) {
212
+	if mq.tail == nil {
213
+		mq.head = m
214
+		mq.tail = m
211 215
 	} else {
212
-		this.tail.next = m
213
-		this.tail = m
216
+		mq.tail.next = m
217
+		mq.tail = m
214 218
 	}
215
-	this.num++
219
+	mq.num++
216 220
 }
217 221
 
218
-func (this *Rudp) getID(max int, bt1, bt2 byte) int {
222
+func (r *Rudp) getID(max int, bt1, bt2 byte) int {
219 223
 	n1, n2 := int(bt1), int(bt2)
220 224
 	id := n1*256 + n2
221 225
 	id |= max & ^0xffff
222 226
 	if id < max-0x8000 {
223 227
 		id += 0x10000
224 228
 		dbg("id < max-0x8000 ,net %v,id %v,min %v,max %v,cur %v",
225
-			n1*256+n2, id, this.recvIDMin, max, id+0x10000)
229
+			n1*256+n2, id, r.recvIDMin, max, id+0x10000)
226 230
 	} else if id > max+0x8000 {
227 231
 		id -= 0x10000
228 232
 		dbg("id > max-0x8000 ,net %v,id %v,min %v,max %v,cur %v",
229
-			n1*256+n2, id, this.recvIDMin, max, id+0x10000)
233
+			n1*256+n2, id, r.recvIDMin, max, id+0x10000)
230 234
 	}
231 235
 	return id
232 236
 }
233 237
 
234
-func (this *Rudp) outPut() *Package {
238
+func (r *Rudp) outPut() *Package {
235 239
 	var tmp packageBuffer
236
-	this.reqMissing(&tmp)
237
-	this.replyRequest(&tmp)
238
-	this.sendMessage(&tmp)
240
+	r.reqMissing(&tmp)
241
+	r.replyRequest(&tmp)
242
+	r.sendMessage(&tmp)
239 243
 	if tmp.head == nil && tmp.tmp.Len() == 0 {
240 244
 		tmp.tmp.WriteByte(byte(TYPE_PING))
241 245
 	}
@@ -243,16 +247,16 @@ func (this *Rudp) outPut() *Package {
243 247
 	return tmp.head
244 248
 }
245 249
 
246
-func (this *Rudp) Input(bts []byte) {
250
+func (r *Rudp) Input(bts []byte) {
247 251
 	sz := len(bts)
248 252
 	if sz > 0 {
249
-		this.lastRecvTick = this.currentTick
253
+		r.lastRecvTick = r.currentTick
250 254
 	}
251 255
 	for sz > 0 {
252 256
 		len := int(bts[0])
253 257
 		if len > 127 {
254 258
 			if sz <= 1 {
255
-				this.corrupt = ERROR_MSG_SIZE
259
+				r.corrupt.Store(ERROR_MSG_SIZE)
256 260
 				return
257 261
 			}
258 262
 			len = (len*256 + int(bts[1])) & 0x7fff
@@ -264,72 +268,72 @@ func (this *Rudp) Input(bts []byte) {
264 268
 		}
265 269
 		switch len {
266 270
 		case TYPE_PING:
267
-			this.checkMissing(false)
271
+			r.checkMissing(false)
268 272
 		case TYPE_EOF:
269
-			this.corrupt = ERROR_EOF
273
+			r.corrupt.Store(ERROR_EOF)
270 274
 		case TYPE_CORRUPT:
271
-			this.corrupt = ERROR_REMOTE_EOF
275
+			r.corrupt.Store(ERROR_REMOTE_EOF)
272 276
 			return
273 277
 		case TYPE_REQUEST, TYPE_MISSING:
274 278
 			if sz < 4 {
275
-				this.corrupt = ERROR_MSG_SIZE
279
+				r.corrupt.Store(ERROR_MSG_SIZE)
276 280
 				return
277 281
 			}
278
-			exe := this.addRequest
279
-			max := this.sendID
282
+			exe := r.addRequest
283
+			max := r.sendID
280 284
 			if len == TYPE_MISSING {
281
-				exe = this.addMissing
282
-				max = this.recvIDMax
285
+				exe = r.addMissing
286
+				max = r.recvIDMax
283 287
 			}
284
-			exe(this.getID(max, bts[0], bts[1]), this.getID(max, bts[2], bts[3]))
288
+			exe(r.getID(max, bts[0], bts[1]), r.getID(max, bts[2], bts[3]))
285 289
 			bts = bts[4:]
286 290
 			sz -= 4
287 291
 		default:
288 292
 			len -= TYPE_NORMAL
289 293
 			if sz < len+2 {
290
-				this.corrupt = ERROR_MSG_SIZE
294
+				r.corrupt.Store(ERROR_MSG_SIZE)
291 295
 				return
292 296
 			}
293
-			this.insertMessage(this.getID(this.recvIDMax, bts[0], bts[1]), bts[2:len+2])
297
+			r.insertMessage(r.getID(r.recvIDMax, bts[0], bts[1]), bts[2:len+2])
294 298
 			bts = bts[len+2:]
295 299
 			sz -= len + 2
296 300
 		}
297 301
 	}
298
-	this.checkMissing(false)
302
+	r.checkMissing(false)
299 303
 }
300 304
 
301
-func (this *Rudp) checkMissing(direct bool) {
302
-	head := this.recvQueue.head
303
-	if head != nil && head.id > this.recvIDMin {
305
+func (r *Rudp) checkMissing(direct bool) {
306
+	head := r.recvQueue.head
307
+	if head != nil && head.id > r.recvIDMin {
304 308
 		nano := int(time.Now().UnixNano())
305
-		last := this.recvSkip[this.recvIDMin]
309
+		last := r.recvSkip[r.recvIDMin]
306 310
 		if !direct && last == 0 {
307
-			this.recvSkip[this.recvIDMin] = nano
308
-			dbg("miss start %v-%v,max %v", this.recvIDMin, head.id-1, this.recvIDMax)
311
+			r.recvSkip[r.recvIDMin] = nano
312
+			dbg("miss start %v-%v,max %v", r.recvIDMin, head.id-1, r.recvIDMax)
309 313
 		} else if direct || last+missingTime < nano {
310
-			delete(this.recvSkip, this.recvIDMin)
311
-			this.reqSendAgain <- [2]int{this.recvIDMin, head.id - 1}
314
+			delete(r.recvSkip, r.recvIDMin)
315
+			r.reqSendAgain <- [2]int{r.recvIDMin, head.id - 1}
312 316
 			dbg("req miss %v-%v,direct %v,wait num %v",
313
-				this.recvIDMin, head.id-1, direct, this.recvQueue.num)
317
+				r.recvIDMin, head.id-1, direct, r.recvQueue.num)
314 318
 		}
315 319
 	}
316 320
 }
317 321
 
318
-func (this *Rudp) insertMessage(id int, bts []byte) {
319
-	if id < this.recvIDMin {
322
+func (r *Rudp) insertMessage(id int, bts []byte) {
323
+	if id < r.recvIDMin {
320 324
 		dbg("already recv %v,len %v", id, len(bts))
321 325
 		return
322 326
 	}
323
-	delete(this.recvSkip, id)
324
-	if id > this.recvIDMax || this.recvQueue.head == nil {
327
+	delete(r.recvSkip, id)
328
+	if id > r.recvIDMax || r.recvQueue.head == nil {
325 329
 		m := &message{}
326 330
 		m.buf.Write(bts)
327 331
 		m.id = id
328
-		this.recvQueue.push(m)
329
-		this.recvIDMax = id
332
+		r.recvQueue.push(m)
333
+		r.recvIDMax = id
330 334
 	} else {
331
-		m := this.recvQueue.head
332
-		last := &this.recvQueue.head
335
+		m := r.recvQueue.head
336
+		last := &r.recvQueue.head
333 337
 		for m != nil {
334 338
 			if m.id == id {
335 339
 				dbg("repeat recv id %v,len %v", id, len(bts))
@@ -339,7 +343,7 @@ func (this *Rudp) insertMessage(id int, bts []byte) {
339 343
 				tmp.id = id
340 344
 				tmp.next = m
341 345
 				*last = tmp
342
-				this.recvQueue.num++
346
+				r.recvQueue.num++
343 347
 				return
344 348
 			}
345 349
 			last = &m.next
@@ -348,68 +352,68 @@ func (this *Rudp) insertMessage(id int, bts []byte) {
348 352
 	}
349 353
 }
350 354
 
351
-func (this *Rudp) sendMessage(tmp *packageBuffer) {
352
-	m := this.sendQueue.head
355
+func (r *Rudp) sendMessage(tmp *packageBuffer) {
356
+	m := r.sendQueue.head
353 357
 	for m != nil {
354 358
 		tmp.packMessage(m)
355 359
 		m = m.next
356 360
 	}
357
-	if this.sendQueue.head != nil {
358
-		if this.sendHistory.tail == nil {
359
-			this.sendHistory = this.sendQueue
361
+	if r.sendQueue.head != nil {
362
+		if r.sendHistory.tail == nil {
363
+			r.sendHistory = r.sendQueue
360 364
 		} else {
361
-			this.sendHistory.tail.next = this.sendQueue.head
362
-			this.sendHistory.tail = this.sendQueue.tail
365
+			r.sendHistory.tail.next = r.sendQueue.head
366
+			r.sendHistory.tail = r.sendQueue.tail
363 367
 		}
364
-		this.sendQueue.head = nil
365
-		this.sendQueue.tail = nil
368
+		r.sendQueue.head = nil
369
+		r.sendQueue.tail = nil
366 370
 	}
367 371
 }
368
-func (this *Rudp) clearSendExpired() {
369
-	m := this.sendHistory.head
372
+func (r *Rudp) clearSendExpired() {
373
+	m := r.sendHistory.head
370 374
 	for m != nil {
371
-		if m.tick >= this.lastExpiredTick {
375
+		if m.tick >= r.lastExpiredTick {
372 376
 			break
373 377
 		}
374 378
 		m = m.next
375 379
 	}
376
-	this.sendHistory.head = m
380
+	r.sendHistory.head = m
377 381
 	if m == nil {
378
-		this.sendHistory.tail = nil
382
+		r.sendHistory.tail = nil
379 383
 	}
380 384
 }
381 385
 
382
-func (this *Rudp) addRequest(min, max int) {
383
-	dbg("add request %v-%v,max send id %v", min, max, this.sendID)
384
-	this.addSendAgain <- [2]int{min, max}
386
+func (r *Rudp) addRequest(min, max int) {
387
+	dbg("add request %v-%v,max send id %v", min, max, r.sendID)
388
+	r.addSendAgain <- [2]int{min, max}
385 389
 }
386 390
 
387
-func (this *Rudp) addMissing(min, max int) {
388
-	if max < this.recvIDMin {
389
-		dbg("add missing %v-%v fail,already recv,min %v", min, max, this.recvIDMin)
391
+func (r *Rudp) addMissing(min, max int) {
392
+	if max < r.recvIDMin {
393
+		dbg("add missing %v-%v fail,already recv,min %v", min, max, r.recvIDMin)
390 394
 		return
391 395
 	}
392
-	if min > this.recvIDMin {
393
-		dbg("add missing %v-%v fail, more than min %v", min, max, this.recvIDMin)
396
+	if min > r.recvIDMin {
397
+		dbg("add missing %v-%v fail, more than min %v", min, max, r.recvIDMin)
394 398
 		return
395 399
 	}
396 400
 	head := 0
397
-	if this.recvQueue.head != nil {
398
-		head = this.recvQueue.head.id
401
+	if r.recvQueue.head != nil {
402
+		head = r.recvQueue.head.id
399 403
 	}
400
-	dbg("add missing %v-%v,min %v,head %v", min, max, this.recvIDMin, head)
401
-	this.recvIDMin = max + 1
402
-	this.checkMissing(true)
404
+	dbg("add missing %v-%v,min %v,head %v", min, max, r.recvIDMin, head)
405
+	r.recvIDMin = max + 1
406
+	r.checkMissing(true)
403 407
 }
404 408
 
405
-func (this *Rudp) replyRequest(tmp *packageBuffer) {
409
+func (r *Rudp) replyRequest(tmp *packageBuffer) {
406 410
 	for {
407 411
 		select {
408
-		case again := <-this.addSendAgain:
409
-			history := this.sendHistory.head
412
+		case again := <-r.addSendAgain:
413
+			history := r.sendHistory.head
410 414
 			min, max := again[0], again[1]
411 415
 			if history == nil || max < history.id {
412
-				dbg("send again miss %v-%v,send max %v", min, max, this.sendID)
416
+				dbg("send again miss %v-%v,send max %v", min, max, r.sendID)
413 417
 				tmp.packRequest(min, max, TYPE_MISSING)
414 418
 			} else {
415 419
 				var start, end, num int
@@ -429,9 +433,9 @@ func (this *Rudp) replyRequest(tmp *packageBuffer) {
429 433
 				}
430 434
 				if min < start {
431 435
 					tmp.packRequest(min, start-1, TYPE_MISSING)
432
-					dbg("send again miss %v-%v,send max %v", min, start-1, this.sendID)
436
+					dbg("send again miss %v-%v,send max %v", min, start-1, r.sendID)
433 437
 				}
434
-				dbg("send again %v-%v of %v-%v,all %v,max send id %v", start, end, min, max, num, this.sendID)
438
+				dbg("send again %v-%v of %v-%v,all %v,max send id %v", start, end, min, max, num, r.sendID)
435 439
 			}
436 440
 		default:
437 441
 			return
@@ -439,10 +443,10 @@ func (this *Rudp) replyRequest(tmp *packageBuffer) {
439 443
 	}
440 444
 }
441 445
 
442
-func (this *Rudp) reqMissing(tmp *packageBuffer) {
446
+func (r *Rudp) reqMissing(tmp *packageBuffer) {
443 447
 	for {
444 448
 		select {
445
-		case req := <-this.reqSendAgain:
449
+		case req := <-r.reqSendAgain:
446 450
 			tmp.packRequest(req[0], req[1], TYPE_REQUEST)
447 451
 		default:
448 452
 			return

Loading…
Cancel
Save