Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: add Writer for relay log #117

Merged
merged 63 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
5ae5bec
write: add a binlog file writer
csuzhangxc Apr 16, 2019
98252cc
writer: basic structure
csuzhangxc Apr 18, 2019
1b54643
writer: add test case for starting underlying writer
csuzhangxc Apr 18, 2019
10767be
Update relay/writer/file_test.go
kennytm Apr 19, 2019
6246da8
*: address comments
csuzhangxc Apr 19, 2019
0516ef0
writer: check binlog file header and FormatDescriptionEvent helper fu…
csuzhangxc Apr 19, 2019
d24c04c
*: handle FormatDescriptionEvent in FileWriter
csuzhangxc Apr 19, 2019
b4633ef
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 19, 2019
79f7262
writer: fix CI
csuzhangxc Apr 19, 2019
f89ea7c
writer: handle RotateEvent
csuzhangxc Apr 22, 2019
f2940f6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 22, 2019
4ef41f1
writer: add test for RotateEvent with FormatDescriptionEvent
csuzhangxc Apr 22, 2019
6a28944
event: change GenPreviousGTIDsEvent to return a GenericEvent
csuzhangxc Apr 22, 2019
2b62d66
writer: add test case with DDL and DML
csuzhangxc Apr 22, 2019
d6b2d1e
writer: update offset after events wrote
csuzhangxc Apr 22, 2019
50f4187
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 22, 2019
595f276
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 24, 2019
8d8f289
event: GenDummyEvent (USER_VAR_EVENT/QueryEvent)
csuzhangxc Apr 24, 2019
18f1367
writer: use dummy event to fill the potential hole
csuzhangxc Apr 25, 2019
ed3015c
writer: check event is duplicate
csuzhangxc Apr 25, 2019
298e87c
writer: handle duplicate events
csuzhangxc Apr 25, 2019
ea41ba8
writer: close FileReader in defer
csuzhangxc Apr 25, 2019
866df1e
*: refine code structure
csuzhangxc Apr 25, 2019
5c640fc
event: get GTID set from a PreviousGTIDsEvent
csuzhangxc Apr 25, 2019
b071768
event: get GTID set from a MariaDBGTIDListEvent
csuzhangxc Apr 25, 2019
7d3517b
writer: add Recover method; recover nothing
csuzhangxc Apr 26, 2019
aaf6b1f
writer: truncate uncompleted event part
csuzhangxc Apr 26, 2019
16ad527
writer: test truncate the uncompleted transaction
csuzhangxc Apr 27, 2019
ea9dfd7
writer: refine code
csuzhangxc Apr 27, 2019
e9141e8
writer: get latest GTID set for all completed transactions
csuzhangxc Apr 27, 2019
651dc32
writer: get latest pos/GTID set from MariaDB binlog file
csuzhangxc Apr 27, 2019
d6791f4
writer: test without GTID; test illegal GTID
csuzhangxc Apr 27, 2019
95b30f8
writer: add some comment
csuzhangxc Apr 27, 2019
384f5b0
writer: recover a file to get latest GTID set
csuzhangxc Apr 27, 2019
3fc0091
writer: add a comment
csuzhangxc Apr 27, 2019
9f0a93c
writer: address comment
csuzhangxc Apr 28, 2019
68dddc6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 7, 2019
d4c9499
*: address comments
csuzhangxc May 7, 2019
7f04c2f
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 8, 2019
e824a91
event: address comment
csuzhangxc May 8, 2019
4371856
event: address comments
csuzhangxc May 8, 2019
4586ef4
*: address comments
csuzhangxc May 10, 2019
7827ce8
writer: address comment
csuzhangxc May 10, 2019
677fca6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 10, 2019
30ff9f2
*: try fix CI
csuzhangxc May 10, 2019
e59c2ed
*: address comment to unify Stage for binlog related objects
csuzhangxc May 12, 2019
dd7487a
writer: address comments
csuzhangxc May 12, 2019
9e3172a
*: address comments
csuzhangxc May 12, 2019
7514e2f
*: address comments
csuzhangxc May 12, 2019
1739d15
*: address comments
csuzhangxc May 13, 2019
abeac9b
*: address comments
csuzhangxc May 13, 2019
60007e1
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 13, 2019
63d1b7b
*: address comments
csuzhangxc May 13, 2019
67393f7
writer: address comments
csuzhangxc May 13, 2019
a4a3b15
writer: address comments
csuzhangxc May 13, 2019
96e108e
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 14, 2019
0d3de7d
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 20, 2019
80fd184
writer: address comment, handleDuplicateEventsExist only when holeSiz…
csuzhangxc May 21, 2019
d536bd4
writer: refine Writer interface
csuzhangxc May 21, 2019
6d54140
writer: return `true` when FormatDescriptionEvent exist for `checkFor…
csuzhangxc May 21, 2019
16e0574
writer: check hole and duplicate for RotateEvent
csuzhangxc May 21, 2019
6802a07
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 22, 2019
2f332b0
writer: address comment
csuzhangxc May 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions relay/writer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,43 +216,57 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) (
// handle RotateEvent:
// 1. update binlog filename if needed
// 2. write the RotateEvent if not fake
// NOTE: we only see fake event for RotateEvent in MySQL source code,
// if see fake event for other event type, then handle them.
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
// NOTE: we do not create a new binlog file when received a RotateEvent,
// instead, we create a new binlog file when received a FormatDescriptionEvent.
// because a binlog file without any events has no meaning.
func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (*Result, error) {
func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (result *Result, err error) {
rotateEv, ok := ev.Event.(*replication.RotateEvent)
if !ok {
return nil, errors.NotValidf("except RotateEvent, but got %+v", ev.Header)
}

// update binlog filename if needed
var currFile = w.filename.Get()
nextFile := string(rotateEv.NextLogName)
if nextFile > currFile {
// record the next filename, but not create it.
// even it's a fake RotateEvent, we still need to record it,
// because if we do not specify the filename when creating the writer (like Auto-Position),
// we can only receive a fake RotateEvent before the FormatDescriptionEvent.
w.filename.Set(nextFile)
}
defer func() {
if err == nil {
// update binlog filename if needed
nextFile := string(rotateEv.NextLogName)
if nextFile > currFile {
// record the next filename, but not create it.
// even it's a fake RotateEvent, we still need to record it,
// because if we do not specify the filename when creating the writer (like Auto-Position),
// we can only receive a fake RotateEvent before the FormatDescriptionEvent.
w.filename.Set(nextFile)
}
}
}()

// write the RotateEvent if not fake
var ignore bool
if ev.Header.Timestamp == 0 || ev.Header.LogPos == 0 {
// skip fake rotate event
ignore = true
return &Result{
Ignore: true,
}, nil
} else if w.out == nil {
IANTHEREAL marked this conversation as resolved.
Show resolved Hide resolved
// if not open a binlog file yet, then non-fake RotateEvent can't be handled
return nil, errors.Errorf("non-fake RotateEvent %+v received, but no binlog file opened", ev.Header)
} else {
err := w.out.Write(ev.RawData)
if err != nil {
return nil, errors.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.cfg.RelayDir, currFile))
}
}

result, err = w.handlePotentialHoleOrDuplicate(ev)
if err != nil {
return nil, errors.Trace(err)
} else if result.Ignore {
return result, nil
}

err = w.out.Write(ev.RawData)
if err != nil {
return nil, errors.Annotatef(err, "write RotateEvent %+v for %s", ev.Header, filepath.Join(w.cfg.RelayDir, currFile))
}

return &Result{
Ignore: ignore,
Ignore: false,
}, nil
}

Expand All @@ -261,6 +275,22 @@ func (w *FileWriter) handleRotateEvent(ev *replication.BinlogEvent) (*Result, er
// 2. handle any duplicate events if exist
// 3. write the non-duplicate event
func (w *FileWriter) handleEventDefault(ev *replication.BinlogEvent) (*Result, error) {
result, err := w.handlePotentialHoleOrDuplicate(ev)
if err != nil {
return nil, errors.Trace(err)
} else if result.Ignore {
return result, nil
}

// write the non-duplicate event
err = w.out.Write(ev.RawData)
return &Result{
Ignore: false,
}, errors.Annotatef(err, "write event %+v", ev.Header)
}

// handlePotentialHoleOrDuplicate combines handleFileHoleExist and handleDuplicateEventsExist.
func (w *FileWriter) handlePotentialHoleOrDuplicate(ev *replication.BinlogEvent) (*Result, error) {
// handle a potential hole
mayDuplicate, err := w.handleFileHoleExist(ev)
if err != nil {
Expand All @@ -281,11 +311,9 @@ func (w *FileWriter) handleEventDefault(ev *replication.BinlogEvent) (*Result, e
}
}

// write the non-duplicate event
err = w.out.Write(ev.RawData)
return &Result{
Ignore: false,
}, errors.Annotatef(err, "write event %+v", ev.Header)
}, nil
}

// handleFileHoleExist tries to handle a potential hole after this event wrote.
Expand Down Expand Up @@ -330,7 +358,7 @@ func (w *FileWriter) handleDuplicateEventsExist(ev *replication.BinlogEvent) (*R
filename := filepath.Join(w.cfg.RelayDir, w.filename.Get())
duplicate, err := checkIsDuplicateEvent(filename, ev)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it inefficient that call it for every event?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in most cases, checkIsDuplicateEvent only checks start/end pos with the file size (has a comment in checkIsDuplicateEvent), so I think it is not inefficient. and if we want to make it work correctly, we must do this check.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worrid that os.Stat is an inefficient function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, pass file size into the function as an arg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for os.Stat, it will call syscall.Stat; for File.Stat, it will call syscall.Fstat.

from http://man7.org/linux/man-pages/man2/stat.2.html.

fstat() is identical to stat(), except that the file about which
information is to be retrieved is specified by the file descriptor
fd.

if we changed to pass File.Stat() into the function, the difference of efficiency is small? or should we need to pass an in-memory offset into the function? @GregoryIan what's your opinion.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can choose one

  • check duplicate event only if we need to check, like holeSize < 0
  • or parse a fd or stat information as arg?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I choose the first one.

if err != nil {
return nil, errors.Annotatef(err, "check event %+v is duplicate in %s", ev, filename)
return nil, errors.Annotatef(err, "check event %+v whether duplicate in %s", ev.Header, filename)
}

return &Result{
Expand Down
20 changes: 18 additions & 2 deletions relay/writer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
latestPos uint32 = 4
)

formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos)
c.Assert(err, check.IsNil)
c.Assert(formatDescEv, check.NotNil)
latestPos = formatDescEv.Header.LogPos

rotateEv, err := event.GenRotateEvent(header, latestPos, []byte(nextFilename), nextFilePos)
c.Assert(err, check.IsNil)
c.Assert(rotateEv, check.NotNil)
Expand All @@ -269,9 +274,10 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
c.Assert(err, check.IsNil)
c.Assert(fakeRotateEv, check.NotNil)

formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos)
// hole exists between formatDescEv and holeRotateEv, but the size is too small to fill
holeRotateEv, err := event.GenRotateEvent(header, latestPos+event.MinUserVarEventLen-1, []byte(nextFilename), nextFilePos)
c.Assert(err, check.IsNil)
c.Assert(formatDescEv, check.NotNil)
c.Assert(holeRotateEv, check.NotNil)

// 1: non-fake RotateEvent before FormatDescriptionEvent, invalid
w1 := NewFileWriter(cfg, t.parser)
Expand Down Expand Up @@ -347,6 +353,11 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
c.Assert(result, check.NotNil)
c.Assert(result.Ignore, check.IsFalse)

// try to write a rotateEv with hole exists
result, err = w4.WriteEvent(holeRotateEv)
c.Assert(err, check.ErrorMatches, ".*required dummy event size.*is too small.*")
c.Assert(result, check.IsNil)

result, err = w4.WriteEvent(rotateEv)
c.Assert(err, check.IsNil)
c.Assert(result, check.NotNil)
Expand All @@ -355,6 +366,11 @@ func (t *testFileWriterSuite) TestRotateEventWithFormatDescriptionEvent(c *check
fileSize += int64(len(rotateEv.RawData))
t.verifyFilenameOffset(c, w4, nextFilename, fileSize)

// write again, duplicate, but we already rotated and new binlog file not created
result, err = w4.WriteEvent(rotateEv)
c.Assert(err, check.ErrorMatches, ".*no such file or directory.*")
c.Assert(result, check.IsNil)

// cfg.Filename should contain both one FormatDescriptionEvent and one RotateEvent, next file should be empty
filename1 = filepath.Join(cfg.RelayDir, cfg.Filename)
filename2 = filepath.Join(cfg.RelayDir, nextFilename)
Expand Down