Golang implementation of the Reliable UDP transport, revised from an original documented mostly in chinese.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

conn.go 4.3KB


  1. package rudp
  2. import (
  3. "net"
  4. "time"
  5. )
  6. func NewConn(conn *net.UDPConn, rudp *Rudp) *RudpConn {
  7. con := &RudpConn{conn: conn, rudp: rudp,
  8. recvChan: make(chan []byte, 1<<16), recvErr: make(chan error, 2),
  9. sendChan: make(chan []byte, 1<<16), sendErr: make(chan error, 2),
  10. SendTick: make(chan int, 2),
  11. }
  12. go con.run()
  13. return con
  14. }
  15. func NewUnConn(conn *net.UDPConn, remoteAddr *net.UDPAddr, rudp *Rudp, close func(string)) *RudpConn {
  16. con := &RudpConn{conn: conn, rudp: rudp, SendTick: make(chan int, 2),
  17. recvChan: make(chan []byte, 1<<16), recvErr: make(chan error, 2),
  18. sendChan: make(chan []byte, 1<<16), sendErr: make(chan error, 2),
  19. closef: close, remoteAddr: remoteAddr, in: make(chan []byte, 1<<16),
  20. }
  21. go con.run()
  22. return con
  23. }
  24. type RudpConn struct {
  25. conn *net.UDPConn
  26. rudp *Rudp
  27. recvChan chan []byte
  28. recvErr chan error
  29. sendChan chan []byte
  30. sendErr chan error
  31. SendTick chan int
  32. //unconected
  33. remoteAddr *net.UDPAddr
  34. closef func(addr string)
  35. in chan []byte
  36. }
  37. func (rc *RudpConn) SetDeadline(t time.Time) error { return nil }
  38. func (rc *RudpConn) SetReadDeadline(t time.Time) error { return nil }
  39. func (rc *RudpConn) SetWriteDeadline(t time.Time) error { return nil }
  40. func (rc *RudpConn) LocalAddr() net.Addr { return rc.conn.LocalAddr() }
  41. func (rc *RudpConn) Connected() bool { return rc.remoteAddr == nil }
  42. func (rc *RudpConn) RemoteAddr() net.Addr {
  43. if rc.remoteAddr != nil {
  44. return rc.remoteAddr
  45. }
  46. return rc.conn.RemoteAddr()
  47. }
  48. func (rc *RudpConn) Close() error {
  49. var err error
  50. if rc.remoteAddr != nil {
  51. if rc.closef != nil {
  52. rc.closef(rc.remoteAddr.String())
  53. }
  54. _, err = rc.conn.WriteToUDP([]byte{TYPE_CORRUPT}, rc.remoteAddr)
  55. rc.in <- []byte{TYPE_EOF}
  56. } else {
  57. _, err = rc.conn.Write([]byte{TYPE_CORRUPT})
  58. }
  59. checkErr(err)
  60. return err
  61. }
  62. func (rc *RudpConn) Read(bts []byte) (n int, err error) {
  63. select {
  64. case data := <-rc.recvChan:
  65. copy(bts, data)
  66. return len(data), nil
  67. case err := <-rc.recvErr:
  68. return 0, err
  69. }
  70. }
  71. func (r *RudpConn) send(bts []byte) (err error) {
  72. select {
  73. case r.sendChan <- bts:
  74. return nil
  75. case err := <-r.sendErr:
  76. return err
  77. }
  78. }
  79. func (r *RudpConn) Write(bts []byte) (n int, err error) {
  80. sz := len(bts)
  81. for len(bts)+MAX_MSG_HEAD > GENERAL_PACKAGE {
  82. if err := r.send(bts[:GENERAL_PACKAGE-MAX_MSG_HEAD]); err != nil {
  83. return 0, err
  84. }
  85. bts = bts[GENERAL_PACKAGE-MAX_MSG_HEAD:]
  86. }
  87. return sz, r.send(bts)
  88. }
  89. func (r *RudpConn) rudpRecv(data []byte) error {
  90. for {
  91. n, err := r.rudp.Recv(data)
  92. if err != nil {
  93. r.recvErr <- err
  94. return err
  95. } else if n == 0 {
  96. break
  97. }
  98. bts := make([]byte, n)
  99. dataRead := data[:n]
  100. copy(bts, dataRead)
  101. r.recvChan <- bts
  102. }
  103. return nil
  104. }
  105. func (r *RudpConn) conectedRecvLoop() {
  106. data := make([]byte, MAX_PACKAGE)
  107. for {
  108. n, err := r.conn.Read(data)
  109. if err != nil {
  110. r.recvErr <- err
  111. return
  112. }
  113. dataRead := data[:n]
  114. r.rudp.Input(dataRead)
  115. if r.rudpRecv(data) != nil {
  116. return
  117. }
  118. }
  119. }
  120. func (r *RudpConn) unconectedRecvLoop() {
  121. data := make([]byte, MAX_PACKAGE)
  122. for {
  123. select {
  124. case bts := <-r.in:
  125. r.rudp.Input(bts)
  126. if r.rudpRecv(data) != nil {
  127. return
  128. }
  129. }
  130. }
  131. }
  132. func (r *RudpConn) sendLoop() {
  133. var sendNum int
  134. for {
  135. select {
  136. case tick := <-r.SendTick:
  137. sendOut:
  138. for {
  139. select {
  140. case bts := <-r.sendChan:
  141. _, err := r.rudp.Send(bts)
  142. if err != nil {
  143. r.sendErr <- err
  144. return
  145. }
  146. sendNum++
  147. if sendNum >= maxSendNumPerTick {
  148. break sendOut
  149. }
  150. default:
  151. break sendOut
  152. }
  153. }
  154. sendNum = 0
  155. p := r.rudp.Update(tick)
  156. var num, sz int
  157. for p != nil {
  158. n, err := int(0), error(nil)
  159. if r.Connected() {
  160. n, err = r.conn.Write(p.Bts)
  161. } else {
  162. n, err = r.conn.WriteToUDP(p.Bts, r.remoteAddr)
  163. }
  164. if err != nil {
  165. r.sendErr <- err
  166. return
  167. }
  168. sz, num = sz+n, num+1
  169. p = p.Next
  170. }
  171. if num > 1 {
  172. show := bitShow(sz * int(time.Second/sendTick))
  173. dbg("send package num %v,sz %v, %v/s,local %v,remote %v",
  174. num, show, show, r.LocalAddr(), r.RemoteAddr())
  175. }
  176. }
  177. }
  178. }
  179. func (r *RudpConn) run() {
  180. if autoSend && sendTick > 0 {
  181. go func() {
  182. tick := time.Tick(sendTick)
  183. for {
  184. select {
  185. case <-tick:
  186. r.SendTick <- 1
  187. }
  188. }
  189. }()
  190. }
  191. go func() {
  192. if r.Connected() {
  193. r.conectedRecvLoop()
  194. } else {
  195. r.unconectedRecvLoop()
  196. }
  197. }()
  198. r.sendLoop()
  199. }