Golang implementation of the Reliable UDP transport, revised from an original documented mostly in chinese.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rudp.go 9.3KB


  1. package rudp
  2. import (
  3. "bytes"
  4. "errors"
  5. "time"
  6. "go.uber.org/atomic"
  7. )
  8. const (
  9. TYPE_PING = iota
  10. TYPE_EOF
  11. TYPE_CORRUPT
  12. TYPE_REQUEST
  13. TYPE_MISSING
  14. TYPE_NORMAL
  15. )
  16. const (
  17. MAX_MSG_HEAD = 4
  18. GENERAL_PACKAGE = 576 - 60 - 8
  19. MAX_PACKAGE = 0x7fff - TYPE_NORMAL
  20. )
  21. type Err struct {
  22. atomic.Int32
  23. }
  24. const (
  25. ERROR_NIL int32 = iota
  26. ERROR_EOF
  27. ERROR_REMOTE_EOF
  28. ERROR_CORRUPT
  29. ERROR_MSG_SIZE
  30. )
  31. func (e Err) Error() error {
  32. switch e.Int32.Load() {
  33. case ERROR_EOF:
  34. return errors.New("EOF")
  35. case ERROR_REMOTE_EOF:
  36. return errors.New("remote EOF")
  37. case ERROR_CORRUPT:
  38. return errors.New("corrupt")
  39. case ERROR_MSG_SIZE:
  40. return errors.New("recive msg size error")
  41. default:
  42. return nil
  43. }
  44. }
  45. type Package struct {
  46. Next *Package
  47. Bts []byte
  48. }
  49. type packageBuffer struct {
  50. tmp bytes.Buffer
  51. num int
  52. head *Package
  53. tail *Package
  54. }
  55. func (pb *packageBuffer) packRequest(min, max int, tag int) {
  56. if pb.tmp.Len()+5 > GENERAL_PACKAGE {
  57. pb.newPackage()
  58. }
  59. pb.tmp.WriteByte(byte(tag))
  60. pb.tmp.WriteByte(byte((min & 0xff00) >> 8))
  61. pb.tmp.WriteByte(byte(min & 0xff))
  62. pb.tmp.WriteByte(byte((max & 0xff00) >> 8))
  63. pb.tmp.WriteByte(byte(max & 0xff))
  64. }
  65. func (pb *packageBuffer) fillHeader(head, id int) {
  66. if head < 128 {
  67. pb.tmp.WriteByte(byte(head))
  68. } else {
  69. pb.tmp.WriteByte(byte(((head & 0x7f00) >> 8) | 0x80))
  70. pb.tmp.WriteByte(byte(head & 0xff))
  71. }
  72. pb.tmp.WriteByte(byte((id & 0xff00) >> 8))
  73. pb.tmp.WriteByte(byte(id & 0xff))
  74. }
  75. func (pb *packageBuffer) packMessage(m *message) {
  76. if m.buf.Len()+4+pb.tmp.Len() >= GENERAL_PACKAGE {
  77. pb.newPackage()
  78. }
  79. pb.fillHeader(m.buf.Len()+TYPE_NORMAL, m.id)
  80. pb.tmp.Write(m.buf.Bytes())
  81. }
  82. func (pb *packageBuffer) newPackage() {
  83. if pb.tmp.Len() <= 0 {
  84. return
  85. }
  86. p := &Package{Bts: make([]byte, pb.tmp.Len())}
  87. copy(p.Bts, pb.tmp.Bytes())
  88. pb.tmp.Reset()
  89. pb.num++
  90. if pb.tail == nil {
  91. pb.head = p
  92. pb.tail = p
  93. } else {
  94. pb.tail.Next = p
  95. pb.tail = p
  96. }
  97. }
  98. func New() *Rudp {
  99. return &Rudp{reqSendAgain: make(chan [2]int, 1<<10), addSendAgain: make(chan [2]int, 1<<10), recvSkip: make(map[int]int)}
  100. }
  101. type Rudp struct {
  102. recvQueue messageQueue
  103. recvSkip map[int]int
  104. reqSendAgain chan [2]int
  105. recvIDMin int
  106. recvIDMax int
  107. sendQueue messageQueue
  108. sendHistory messageQueue
  109. addSendAgain chan [2]int
  110. sendID int
  111. corrupt Err
  112. currentTick int
  113. lastRecvTick int
  114. lastExpiredTick int
  115. lastSendDelayTick int
  116. }
  117. func (r *Rudp) Recv(bts []byte) (int, error) {
  118. if err := r.corrupt.Load(); err != ERROR_NIL {
  119. return 0, r.corrupt.Error()
  120. }
  121. m := r.recvQueue.pop(r.recvIDMin)
  122. if m == nil {
  123. return 0, nil
  124. }
  125. r.recvIDMin++
  126. copy(bts, m.buf.Bytes())
  127. return m.buf.Len(), nil
  128. }
  129. func (r *Rudp) Send(bts []byte) (n int, err error) {
  130. if err := r.corrupt.Load(); err != ERROR_NIL {
  131. return 0, r.corrupt.Error()
  132. }
  133. if len(bts) > MAX_PACKAGE {
  134. return 0, nil
  135. }
  136. m := &message{}
  137. m.buf.Write(bts)
  138. m.id = r.sendID
  139. r.sendID++
  140. m.tick = r.currentTick
  141. r.sendQueue.push(m)
  142. return len(bts), nil
  143. }
  144. func (r *Rudp) Update(tick int) *Package {
  145. if r.corrupt.Load() != ERROR_NIL {
  146. return nil
  147. }
  148. r.currentTick += tick
  149. if r.currentTick >= r.lastExpiredTick+expiredTick {
  150. r.lastExpiredTick = r.currentTick
  151. r.clearSendExpired()
  152. }
  153. if r.currentTick >= r.lastRecvTick+corruptTick {
  154. r.corrupt.Store(ERROR_CORRUPT)
  155. }
  156. if r.currentTick >= r.lastSendDelayTick+sendDelayTick {
  157. r.lastSendDelayTick = r.currentTick
  158. return r.outPut()
  159. }
  160. return nil
  161. }
  162. type message struct {
  163. next *message
  164. buf bytes.Buffer
  165. id int
  166. tick int
  167. }
  168. type messageQueue struct {
  169. head *message
  170. tail *message
  171. num int
  172. }
  173. func (mq *messageQueue) pop(id int) *message {
  174. if mq.head == nil {
  175. return nil
  176. }
  177. m := mq.head
  178. if id >= 0 && m.id != id {
  179. return nil
  180. }
  181. mq.head = m.next
  182. m.next = nil
  183. if mq.head == nil {
  184. mq.tail = nil
  185. }
  186. mq.num--
  187. return m
  188. }
  189. func (mq *messageQueue) push(m *message) {
  190. if mq.tail == nil {
  191. mq.head = m
  192. mq.tail = m
  193. } else {
  194. mq.tail.next = m
  195. mq.tail = m
  196. }
  197. mq.num++
  198. }
  199. func (r *Rudp) getID(max int, bt1, bt2 byte) int {
  200. n1, n2 := int(bt1), int(bt2)
  201. id := n1*256 + n2
  202. id |= max & ^0xffff
  203. if id < max-0x8000 {
  204. id += 0x10000
  205. dbg("id < max-0x8000 ,net %v,id %v,min %v,max %v,cur %v",
  206. n1*256+n2, id, r.recvIDMin, max, id+0x10000)
  207. } else if id > max+0x8000 {
  208. id -= 0x10000
  209. dbg("id > max-0x8000 ,net %v,id %v,min %v,max %v,cur %v",
  210. n1*256+n2, id, r.recvIDMin, max, id+0x10000)
  211. }
  212. return id
  213. }
  214. func (r *Rudp) outPut() *Package {
  215. var tmp packageBuffer
  216. r.reqMissing(&tmp)
  217. r.replyRequest(&tmp)
  218. r.sendMessage(&tmp)
  219. if tmp.head == nil && tmp.tmp.Len() == 0 {
  220. tmp.tmp.WriteByte(byte(TYPE_PING))
  221. }
  222. tmp.newPackage()
  223. return tmp.head
  224. }
  225. func (r *Rudp) Input(bts []byte) {
  226. sz := len(bts)
  227. if sz > 0 {
  228. r.lastRecvTick = r.currentTick
  229. }
  230. for sz > 0 {
  231. length := int(bts[0])
  232. if length > 127 {
  233. if sz <= 1 {
  234. r.corrupt.Store(ERROR_MSG_SIZE)
  235. return
  236. }
  237. length = (length*256 + int(bts[1])) & 0x7fff
  238. bts = bts[2:]
  239. sz -= 2
  240. } else {
  241. bts = bts[1:]
  242. sz -= 1
  243. }
  244. switch length {
  245. case TYPE_PING:
  246. r.checkMissing(false)
  247. case TYPE_EOF:
  248. r.corrupt.Store(ERROR_EOF)
  249. case TYPE_CORRUPT:
  250. r.corrupt.Store(ERROR_REMOTE_EOF)
  251. return
  252. case TYPE_REQUEST, TYPE_MISSING:
  253. if sz < 4 {
  254. r.corrupt.Store(ERROR_MSG_SIZE)
  255. return
  256. }
  257. exe := r.addRequest
  258. max := r.sendID
  259. if length == TYPE_MISSING {
  260. exe = r.addMissing
  261. max = r.recvIDMax
  262. }
  263. // this eliminates multiple BCs in the exe function invocation
  264. _ = bts[3]
  265. exe(r.getID(max, bts[0], bts[1]), r.getID(max, bts[2], bts[3]))
  266. bts = bts[4:]
  267. sz -= 4
  268. default:
  269. length -= TYPE_NORMAL
  270. if sz < length+2 {
  271. r.corrupt.Store(ERROR_MSG_SIZE)
  272. return
  273. }
  274. // this prevents most of the bounds checks in the following code and
  275. // would fail in the next function call anyway if this is outside
  276. _ = bts[4]
  277. r.insertMessage(r.getID(r.recvIDMax, bts[0], bts[1]), bts[2:length+2])
  278. bts = bts[length+2:]
  279. sz -= length + 2
  280. }
  281. }
  282. r.checkMissing(false)
  283. }
  284. func (r *Rudp) checkMissing(direct bool) {
  285. head := r.recvQueue.head
  286. if head != nil && head.id > r.recvIDMin {
  287. nano := int(time.Now().UnixNano())
  288. last := r.recvSkip[r.recvIDMin]
  289. if !direct && last == 0 {
  290. r.recvSkip[r.recvIDMin] = nano
  291. dbg("miss start %v-%v,max %v", r.recvIDMin, head.id-1, r.recvIDMax)
  292. } else if direct || last+missingTime < nano {
  293. delete(r.recvSkip, r.recvIDMin)
  294. r.reqSendAgain <- [2]int{r.recvIDMin, head.id - 1}
  295. dbg("req miss %v-%v,direct %v,wait num %v",
  296. r.recvIDMin, head.id-1, direct, r.recvQueue.num)
  297. }
  298. }
  299. }
  300. func (r *Rudp) insertMessage(id int, bts []byte) {
  301. if id < r.recvIDMin {
  302. dbg("already recv %v,len %v", id, len(bts))
  303. return
  304. }
  305. delete(r.recvSkip, id)
  306. if id > r.recvIDMax || r.recvQueue.head == nil {
  307. m := &message{}
  308. m.buf.Write(bts)
  309. m.id = id
  310. r.recvQueue.push(m)
  311. r.recvIDMax = id
  312. } else {
  313. m := r.recvQueue.head
  314. last := &r.recvQueue.head
  315. for m != nil {
  316. if m.id == id {
  317. dbg("repeat recv id %v,len %v", id, len(bts))
  318. } else if m.id > id {
  319. tmp := &message{}
  320. tmp.buf.Write(bts)
  321. tmp.id = id
  322. tmp.next = m
  323. *last = tmp
  324. r.recvQueue.num++
  325. return
  326. }
  327. last = &m.next
  328. m = m.next
  329. }
  330. }
  331. }
  332. func (r *Rudp) sendMessage(tmp *packageBuffer) {
  333. m := r.sendQueue.head
  334. for m != nil {
  335. tmp.packMessage(m)
  336. m = m.next
  337. }
  338. if r.sendQueue.head != nil {
  339. if r.sendHistory.tail == nil {
  340. r.sendHistory = r.sendQueue
  341. } else {
  342. r.sendHistory.tail.next = r.sendQueue.head
  343. r.sendHistory.tail = r.sendQueue.tail
  344. }
  345. r.sendQueue.head = nil
  346. r.sendQueue.tail = nil
  347. }
  348. }
  349. func (r *Rudp) clearSendExpired() {
  350. m := r.sendHistory.head
  351. for m != nil {
  352. if m.tick >= r.lastExpiredTick {
  353. break
  354. }
  355. m = m.next
  356. }
  357. r.sendHistory.head = m
  358. if m == nil {
  359. r.sendHistory.tail = nil
  360. }
  361. }
  362. func (r *Rudp) addRequest(min, max int) {
  363. dbg("add request %v-%v,max send id %v", min, max, r.sendID)
  364. r.addSendAgain <- [2]int{min, max}
  365. }
  366. func (r *Rudp) addMissing(min, max int) {
  367. if max < r.recvIDMin {
  368. dbg("add missing %v-%v fail,already recv,min %v", min, max, r.recvIDMin)
  369. return
  370. }
  371. if min > r.recvIDMin {
  372. dbg("add missing %v-%v fail, more than min %v", min, max, r.recvIDMin)
  373. return
  374. }
  375. head := 0
  376. if r.recvQueue.head != nil {
  377. head = r.recvQueue.head.id
  378. }
  379. dbg("add missing %v-%v,min %v,head %v", min, max, r.recvIDMin, head)
  380. r.recvIDMin = max + 1
  381. r.checkMissing(true)
  382. }
  383. func (r *Rudp) replyRequest(tmp *packageBuffer) {
  384. for {
  385. select {
  386. case again := <-r.addSendAgain:
  387. history := r.sendHistory.head
  388. min, max := again[0], again[1]
  389. if history == nil || max < history.id {
  390. dbg("send again miss %v-%v,send max %v", min, max, r.sendID)
  391. tmp.packRequest(min, max, TYPE_MISSING)
  392. } else {
  393. var start, end, num int
  394. for {
  395. if history == nil || max < history.id {
  396. //expired
  397. break
  398. } else if min <= history.id {
  399. tmp.packMessage(history)
  400. if start == 0 {
  401. start = history.id
  402. }
  403. end = history.id
  404. num++
  405. }
  406. history = history.next
  407. }
  408. if min < start {
  409. tmp.packRequest(min, start-1, TYPE_MISSING)
  410. dbg("send again miss %v-%v,send max %v", min, start-1, r.sendID)
  411. }
  412. dbg("send again %v-%v of %v-%v,all %v,max send id %v", start, end, min, max, num, r.sendID)
  413. }
  414. default:
  415. return
  416. }
  417. }
  418. }
  419. func (r *Rudp) reqMissing(tmp *packageBuffer) {
  420. for {
  421. select {
  422. case req := <-r.reqSendAgain:
  423. tmp.packRequest(req[0], req[1], TYPE_REQUEST)
  424. default:
  425. return
  426. }
  427. }
  428. }