forked from docker-archive/go-p9p
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathchannel.go
343 lines (290 loc) · 10.1 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
package p9p
import (
"bufio"
"context"
"encoding/binary"
"io"
"io/ioutil"
"log"
"net"
"time"
)
const (
// channelMessageHeaderSize is the overhead for sending the size of a
// message on the wire.
channelMessageHeaderSize = 4
)
// Channel defines the operations necessary to implement a 9p message channel
// interface. Typically, message channels do no protocol processing except to
// send and receive message frames.
type Channel interface {
// ReadFcall reads one fcall frame into the provided fcall structure. The
// Fcall may be cleared whether there is an error or not. If the operation
// is successful, the contents of the fcall will be populated in the
// argument. ReadFcall cannot be called concurrently with other calls to
// ReadFcall. This both to preserve message ordering and to allow lockless
// buffer reusage.
ReadFcall(ctx context.Context, fcall *Fcall) error
// WriteFcall writes the provided fcall to the channel. WriteFcall cannot
// be called concurrently with other calls to WriteFcall.
WriteFcall(ctx context.Context, fcall *Fcall) error
// MSize returns the current msize for the channel.
MSize() int
// SetMSize sets the maximum message size for the channel. This must never
// be called currently with ReadFcall or WriteFcall.
SetMSize(msize int)
}
// NewChannel returns a new channel to read and write Fcalls with the provided
// connection and message size.
func NewChannel(conn net.Conn, msize int) Channel {
return newChannel(conn, codec9p{}, msize)
}
const (
defaultRWTimeout = 30 * time.Second // default read/write timeout if not set in context
)
// channel provides bidirectional protocol framing for 9p over net.Conn.
// Operations are not thread-safe but reads and writes may be carried out
// concurrently, supporting separate read and write loops.
//
// Lifecyle
//
// A connection, or message channel abstraction, has a lifecycle delineated by
// Tversion/Rversion request response cycles. For now, this is part of the
// channel itself but doesn't necessarily influence the channels state, except
// the msize. Visually, it might look something like this:
//
// [Established] -> [Version] -> [Session] -> [Version]---+
// ^ |
// |_________________________________|
//
// The connection is established, then we negotiate a version, run a session,
// then negotiate a version and so on. For most purposes, we are likely going
// to terminate the connection after the session but we may want to support
// connection pooling. Pooling may result in possible security leaks if the
// connections are shared among contexts, since the version is negotiated at
// the start of the session. To avoid this, we can actually use a "tombstone"
// version message which clears the server's session state without starting a
// new session. The next version message would then prepare the session
// without leaking any Fid's.
type channel struct {
conn net.Conn
codec Codec
brd *bufio.Reader
bwr *bufio.Writer
closed chan struct{}
msize int
rdbuf []byte
}
func newChannel(conn net.Conn, codec Codec, msize int) *channel {
return &channel{
conn: conn,
codec: codec,
brd: bufio.NewReaderSize(conn, msize), // msize may not be optimal buffer size
bwr: bufio.NewWriterSize(conn, msize),
closed: make(chan struct{}),
msize: msize,
rdbuf: make([]byte, msize),
}
}
func (ch *channel) MSize() int {
return ch.msize
}
// setmsize resizes the buffers for use with a separate msize. This call must
// be protected by a mutex or made before passing to other goroutines.
func (ch *channel) SetMSize(msize int) {
// NOTE(stevvooe): We cannot safely resize the buffered reader and writer.
// Proceed assuming that original size is sufficient.
ch.msize = msize
if msize < len(ch.rdbuf) {
// just change the cap
ch.rdbuf = ch.rdbuf[:msize]
return
}
ch.rdbuf = make([]byte, msize)
}
// ReadFcall reads the next message from the channel into fcall.
//
// If the incoming message overflows the msize, Overflow(err) will return
// nonzero with the number of bytes overflowed.
func (ch *channel) ReadFcall(ctx context.Context, fcall *Fcall) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-ch.closed:
return ErrClosed
default:
}
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultRWTimeout)
}
if err := ch.conn.SetReadDeadline(deadline); err != nil {
log.Printf("p9p: transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
}
n, err := readmsg(ch.brd, ch.rdbuf)
if err != nil {
// TODO(stevvooe): There may be more we can do here to detect partial
// reads. For now, we just propagate the error untouched.
return err
}
if n > len(ch.rdbuf) {
return overflowErr{size: n - len(ch.rdbuf)}
}
// clear out the fcall
*fcall = Fcall{}
if err := ch.codec.Unmarshal(ch.rdbuf[:n], fcall); err != nil {
return err
}
if err := ch.maybeTruncate(fcall); err != nil {
return err
}
return nil
}
// WriteFcall writes the message to the connection.
//
// If a message destined for the wire will overflow MSize, an Overflow error
// may be returned. For Twrite calls, the buffer will simply be truncated to
// the optimal msize, with the caller detecting this condition with
// Rwrite.Count.
func (ch *channel) WriteFcall(ctx context.Context, fcall *Fcall) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-ch.closed:
return ErrClosed
default:
}
deadline, ok := ctx.Deadline()
if !ok {
deadline = time.Now().Add(defaultRWTimeout)
}
if err := ch.conn.SetWriteDeadline(deadline); err != nil {
log.Printf("p9p: transport: error setting read deadline on %v: %v", ch.conn.RemoteAddr(), err)
}
if err := ch.maybeTruncate(fcall); err != nil {
return err
}
p, err := ch.codec.Marshal(fcall)
if err != nil {
return err
}
if err := sendmsg(ch.bwr, p); err != nil {
return err
}
return ch.bwr.Flush()
}
// maybeTruncate will truncate the message to fit into msize on the wire, if
// possible, or modify the message to ensure the response won't overflow.
//
// If the message cannot be truncated, an error will be returned and the
// message should not be sent.
//
// A nil return value means the message can be sent without
func (ch *channel) maybeTruncate(fcall *Fcall) error {
// for certain message types, just remove the extra bytes from the data portion.
switch msg := fcall.Message.(type) {
// TODO(stevvooe): There is one more problematic message type:
//
// Rread: while we can employ the same truncation fix as Twrite, we
// need to make it observable to upstream handlers.
case MessageTread:
// We can rewrite msg.Count so that a return message will be under
// msize. This is more defensive than anything but will ensure that
// calls don't fail on sloppy servers.
// first, craft the shape of the response message
resp := newFcall(fcall.Tag, MessageRread{})
overflow := uint32(ch.msgmsize(resp)) + msg.Count - uint32(ch.msize)
if msg.Count < overflow {
// Let the bad thing happen; msize too small to even support valid
// rewrite. This will result in a Terror from the server-side or
// just work.
return nil
}
msg.Count -= overflow
fcall.Message = msg
return nil
case MessageTwrite:
// If we are going to overflow the msize, we need to truncate the write to
// appropriate size or throw an error in all other conditions.
size := ch.msgmsize(fcall)
if size <= ch.msize {
return nil
}
// overflow the msize, including the channel message size fields.
overflow := size - ch.msize
if len(msg.Data) < overflow {
// paranoid: if msg.Data is not big enough to handle the
// overflow, we should get an overflow error. MSize would have
// to be way too small to be realistic.
return overflowErr{size: overflow}
}
// The truncation is reflected in the return message (Rwrite) by
// the server, so we don't need a return value or error condition
// to communicate it.
msg.Data = msg.Data[:len(msg.Data)-overflow]
fcall.Message = msg // since we have a local copy
return nil
default:
size := ch.msgmsize(fcall)
if size > ch.msize {
// overflow the msize, including the channel message size fields.
return overflowErr{size: size - ch.msize}
}
return nil
}
}
// msgmsize returns the on-wire msize of the Fcall, including the size header.
// Typically, this can be used to detect whether or not the message overflows
// the msize buffer.
func (ch *channel) msgmsize(fcall *Fcall) int {
return channelMessageHeaderSize + ch.codec.Size(fcall)
}
// readmsg reads a 9p message into p from rd, ensuring that all bytes are
// consumed from the size header. If the size header indicates the message is
// larger than p, the entire message will be discarded, leaving a truncated
// portion in p. Any error should be treated as a framing error unless n is
// zero. The caller must check that n is less than or equal to len(p) to
// ensure that a valid message has been read.
func readmsg(rd io.Reader, p []byte) (n int, err error) {
var msize uint32
if err := binary.Read(rd, binary.LittleEndian, &msize); err != nil {
return 0, err
}
n += binary.Size(msize)
mbody := int(msize) - 4
if mbody < len(p) {
p = p[:mbody]
}
np, err := io.ReadFull(rd, p)
if err != nil {
return np + n, err
}
n += np
if mbody > len(p) {
// message has been read up to len(p) but we must consume the entire
// message. This is an error condition but is non-fatal if we can
// consume msize bytes.
nn, err := io.CopyN(ioutil.Discard, rd, int64(mbody-len(p)))
n += int(nn)
if err != nil {
return n, err
}
}
return n, nil
}
// sendmsg writes a message of len(p) to wr with a 9p size header. All errors
// should be considered terminal.
func sendmsg(wr io.Writer, p []byte) error {
size := uint32(len(p) + 4) // message size plus 4-bytes for size.
if err := binary.Write(wr, binary.LittleEndian, size); err != nil {
return err
}
// This assume partial writes to wr aren't possible. Not sure if this
// valid. Matters during timeout retries.
if n, err := wr.Write(p); err != nil {
return err
} else if n < len(p) {
return io.ErrShortWrite
}
return nil
}