-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathsession.go
230 lines (200 loc) · 6.03 KB
/
session.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package easytcp
import (
"fmt"
"github.com/google/uuid"
"io"
"net"
"sync"
"time"
)
// Session represents a TCP session.
type Session interface {
// ID returns current session's id.
ID() interface{}
// SetID sets current session's id.
SetID(id interface{})
// Send sends the ctx to the respStream.
Send(ctx Context) bool
// Codec returns the codec, can be nil.
Codec() Codec
// Close closes current session.
Close()
// AllocateContext gets a Context ships with current session.
AllocateContext() Context
// Conn returns the underlined connection.
Conn() net.Conn
// AfterCreateHook blocks until session's on-create hook triggered.
AfterCreateHook() <-chan struct{}
// AfterCloseHook blocks until session's on-close hook triggered.
AfterCloseHook() <-chan struct{}
}
type session struct {
id interface{} // session's ID.
conn net.Conn // tcp connection
closedC chan struct{} // to close when read/write loop stopped
closeOnce sync.Once // ensure one session only close once
afterCreateHookC chan struct{} // to close after session's on-create hook triggered
afterCloseHookC chan struct{} // to close after session's on-close hook triggered
respStream chan Context // response queue channel, pushed in Send() and popped in writeOutbound()
packer Packer // to pack and unpack message
codec Codec // encode/decode message data
ctxPool sync.Pool // router context pool
asyncRouter bool // calls router HandlerFunc in a goroutine if false
}
// sessionOption is the extra options for session.
type sessionOption struct {
Packer Packer
Codec Codec
respQueueSize int
asyncRouter bool
}
// newSession creates a new session.
// Parameter conn is the TCP connection,
// opt includes packer, codec, and channel size.
// Returns a session pointer.
func newSession(conn net.Conn, opt *sessionOption) *session {
return &session{
id: uuid.NewString(), // use uuid as default
conn: conn,
closedC: make(chan struct{}),
afterCreateHookC: make(chan struct{}),
afterCloseHookC: make(chan struct{}),
respStream: make(chan Context, opt.respQueueSize),
packer: opt.Packer,
codec: opt.Codec,
ctxPool: sync.Pool{New: func() interface{} { return newContext() }},
asyncRouter: opt.asyncRouter,
}
}
// ID returns the session's id.
func (s *session) ID() interface{} {
return s.id
}
// SetID sets session id.
// Can be called in server.OnSessionCreate() callback.
func (s *session) SetID(id interface{}) {
s.id = id
}
// Send pushes response message to respStream.
// Returns false if session is closed or ctx is done.
func (s *session) Send(ctx Context) (ok bool) {
select {
case <-ctx.Done():
return false
case <-s.closedC:
return false
case s.respStream <- ctx:
return true
}
}
// Codec implements Session Codec.
func (s *session) Codec() Codec {
return s.codec
}
// Close closes the session, but doesn't close the connection.
// The connection will be closed in the server once the session's closed.
func (s *session) Close() {
s.closeOnce.Do(func() { close(s.closedC) })
}
// AfterCreateHook blocks until session's on-create hook triggered.
func (s *session) AfterCreateHook() <-chan struct{} {
return s.afterCreateHookC
}
// AfterCloseHook blocks until session's on-close hook triggered.
func (s *session) AfterCloseHook() <-chan struct{} {
return s.afterCloseHookC
}
// AllocateContext gets a Context from pool and reset all but session.
func (s *session) AllocateContext() Context {
c := s.ctxPool.Get().(*routeContext)
c.reset()
c.SetSession(s)
return c
}
// Conn returns the underlined connection instance.
func (s *session) Conn() net.Conn {
return s.conn
}
// readInbound reads message packet from connection in a loop.
// And send unpacked message to reqQueue, which will be consumed in router.
// The loop breaks if errors occurred or the session is closed.
func (s *session) readInbound(router *Router, timeout time.Duration) {
for {
select {
case <-s.closedC:
return
default:
}
if timeout > 0 {
if err := s.conn.SetReadDeadline(time.Now().Add(timeout)); err != nil {
_log.Errorf("session %s set read deadline err: %s", s.id, err)
break
}
}
reqMsg, err := s.packer.Unpack(s.conn)
if err != nil {
logMsg := fmt.Sprintf("session %s unpack inbound packet err: %s", s.id, err)
if err == io.EOF {
_log.Tracef(logMsg)
} else {
_log.Errorf(logMsg)
}
break
}
if reqMsg == nil {
continue
}
if s.asyncRouter {
go s.handleReq(router, reqMsg)
} else {
s.handleReq(router, reqMsg)
}
}
_log.Tracef("session %s readInbound exit because of error", s.id)
s.Close()
}
func (s *session) handleReq(router *Router, reqMsg *Message) {
ctx := s.AllocateContext().SetRequestMessage(reqMsg)
router.handleRequest(ctx)
s.Send(ctx)
}
// writeOutbound fetches message from respStream channel and writes to TCP connection in a loop.
// Parameter writeTimeout specified the connection writing timeout.
// The loop breaks if errors occurred, or the session is closed.
func (s *session) writeOutbound(writeTimeout time.Duration) {
for {
var ctx Context
select {
case <-s.closedC:
return
case ctx = <-s.respStream:
}
outboundBytes, err := s.packResponse(ctx)
if err != nil {
_log.Errorf("session %s pack outbound message err: %s", s.id, err)
continue
}
if outboundBytes == nil {
continue
}
if writeTimeout > 0 {
if err := s.conn.SetWriteDeadline(time.Now().Add(writeTimeout)); err != nil {
_log.Errorf("session %s set write deadline err: %s", s.id, err)
break
}
}
if _, err := s.conn.Write(outboundBytes); err != nil {
_log.Errorf("session %s conn write err: %s", s.id, err)
break
}
}
s.Close()
_log.Tracef("session %s writeOutbound exit because of error", s.id)
}
func (s *session) packResponse(ctx Context) ([]byte, error) {
defer s.ctxPool.Put(ctx)
if ctx.Response() == nil {
return nil, nil
}
return s.packer.Pack(ctx.Response())
}