Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded replication WIP #1454

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
37c1abd
WIP.
arthurschreiber Sep 30, 2024
5d69fd9
Fixes.
arthurschreiber Sep 30, 2024
df29aa8
Fixups.
arthurschreiber Sep 30, 2024
81a6e6e
Special handling for first job.
arthurschreiber Sep 30, 2024
b44c6df
Fix deadlocks.
arthurschreiber Sep 30, 2024
55ce5a2
Update docs and simulate changes.
arthurschreiber Sep 30, 2024
0d8f39b
start StreamTransaction implementation
meiji163 Oct 3, 2024
22a4b8c
add job Coordinator
meiji163 Oct 3, 2024
3fd6e71
fix job complete logic
meiji163 Oct 3, 2024
f209592
add docker compose
meiji163 Oct 4, 2024
c6c877e
fix application of applyEvent.writeFunc
meiji163 Oct 4, 2024
b031166
Send off transaction events as soon as we see a GTID event.
arthurschreiber Oct 7, 2024
f81a790
WIP.
arthurschreiber Oct 8, 2024
e3b2cda
WIP.
arthurschreiber Oct 9, 2024
3ba9058
fix MarkTransactionCompleted
meiji163 Oct 10, 2024
74f6c9c
go mod tidy
meiji163 Oct 10, 2024
eddc1c9
configure max idle connections
meiji163 Oct 10, 2024
6321e73
vendor packages
meiji163 Oct 10, 2024
5600b91
track binlog coords
meiji163 Oct 10, 2024
3f47ebd
binlog streamer reconnect
meiji163 Oct 16, 2024
e81aabf
fix TestMigrate
meiji163 Oct 16, 2024
e4da5f8
worker-stats command
meiji163 Oct 17, 2024
c5e239c
setup mysql in CI
meiji163 Oct 17, 2024
2e78f6f
set transaction_write_set_extraction in test
meiji163 Oct 17, 2024
875d00d
add ci mysql opt flags
meiji163 Oct 17, 2024
9a022e2
Revert "set transaction_write_set_extraction in test"
meiji163 Oct 17, 2024
18dcee1
change ci mysql opts
meiji163 Oct 17, 2024
299df37
try custom docker run
meiji163 Oct 17, 2024
941689f
Make the linter happy.
arthurschreiber Oct 21, 2024
9e3bc1c
Use testcontainers.
arthurschreiber Oct 21, 2024
113e674
WIP.
arthurschreiber Oct 21, 2024
8e38b86
Merge branch 'master' of /~https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
d7ccab9
Merge branch 'master' of /~https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
06c7082
fix 2 tests, TestMigrate still not working
meiji163 Oct 21, 2024
d7dc97b
Fix applier tests.
arthurschreiber Oct 21, 2024
bcc7e7a
Fix migrator tests.
arthurschreiber Oct 21, 2024
b747d98
Add error assertions.
arthurschreiber Oct 21, 2024
1a5be0b
🔥
arthurschreiber Oct 21, 2024
1942455
Fix applier connection pool size.
arthurschreiber Oct 22, 2024
85cab4d
Merge branch 'master' into meiji163/parallel-repl
arthurschreiber Oct 23, 2024
2ba0cb2
Fix merge conflict.
arthurschreiber Oct 23, 2024
b82e8f9
Prepare queries.
arthurschreiber Oct 23, 2024
fa7c484
pass throttler to Coordinator
meiji163 Oct 24, 2024
126c981
track time waiting on event channels
meiji163 Oct 24, 2024
641fe92
add flag for number of Coordinator workers
meiji163 Oct 25, 2024
615d1df
Merge branch 'master' of /~https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 25, 2024
0a8787e
Fix test case.
arthurschreiber Oct 25, 2024
6aaa374
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
f6ec835
remove unused streamer
meiji163 Nov 18, 2024
0555d72
fix coordinator test
meiji163 Nov 18, 2024
569d035
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
9b5e2bc
fix Coordinator test
meiji163 Nov 19, 2024
6689aeb
fix migrator test
meiji163 Nov 19, 2024
53e953d
linter fix
meiji163 Nov 19, 2024
e1ca9cd
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 22, 2024
b34f2e2
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
adfcdf7
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add job Coordinator
  • Loading branch information
meiji163 committed Oct 3, 2024
commit 22a4b8c4888d1f2c87e2e1988e18d1e87d6a1ee1
3 changes: 3 additions & 0 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
)
switch dml {
case InsertDML:

{
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
Expand Down Expand Up @@ -286,6 +287,8 @@ groups:
case *replication.TableMapEvent:
// TODO: Can we be smart here and short circuit processing groups for tables that don't match the table in the migration context?

this.migrationContext.Log.Infof("sending transaction: %d %d", group.SequenceNumber, group.LastCommitted)

group.TableName = string(binlogEvent.Table)
group.DatabaseName = string(binlogEvent.Schema)
// we are good to send the transaction, the transaction events arrive async
Expand Down
49 changes: 36 additions & 13 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"fmt"
"io"
"math"
"os"
"strings"
"sync/atomic"
"time"

"os"

"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
Expand Down Expand Up @@ -424,8 +425,9 @@ func (this *Migrator) Migrate() (err error) {
return err
}

// TODO: configure workers
numApplierWorkers := 4
// TODO(meiji163): configure workers
numApplierWorkers := 5
this.migrationContext.Log.Info("starting applier workers")
go this.trxCoordinator.StartWorkers(numApplierWorkers)
go this.executeWriteFuncs()
go this.iterateChunks()
Expand Down Expand Up @@ -1083,14 +1085,30 @@ func (this *Migrator) initiateStreaming() error {
if err := this.eventsStreamer.InitDBConnections(); err != nil {
return err
}
this.eventsStreamer.AddListener(
// this.eventsStreamer.AddListener(
// false,
// this.migrationContext.DatabaseName,
// this.migrationContext.GetChangelogTableName(),
// func(dmlEvent *binlog.BinlogDMLEvent) error {
// return this.onChangelogEvent(dmlEvent)
// },
// )

handleChangeLogTrx := func(trx *binlog.Transaction) error {
for rowEvent := range trx.Changes {
if err := this.onChangelogEvent(rowEvent.DmlEvent); err != nil {
return err
}
}
return nil
}
this.eventsStreamer.AddTransactionListener(
false,
this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error {
return this.onChangelogEvent(dmlEvent)
},
handleChangeLogTrx,
)

ctx := context.Background()
go func() {
this.migrationContext.Log.Debugf("Beginning streaming")
Expand Down Expand Up @@ -1266,6 +1284,7 @@ func (this *Migrator) iterateChunks() error {
}

func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
this.migrationContext.Log.Infof("onApplyEventStruct: %+v", eventStruct)
handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
if eventStruct.writeFunc != nil {
if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
Expand All @@ -1274,14 +1293,16 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
}
return nil
}
if eventStruct.dmlEvent == nil {
return handleNonDMLEventStruct(eventStruct)
}
// if eventStruct.dmlEvent == nil {
// return handleNonDMLEventStruct(eventStruct)
// }
if eventStruct.trxEvent != nil {
this.migrationContext.Log.Infof("got transaction: %+v", eventStruct.trxEvent)
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))

processTrxFunc := func() error {
// process rows events from the transaction in batches
this.migrationContext.Log.Info("starting process transaction job")
for {
dmlEvents := make([]*binlog.BinlogDMLEvent, 0, batchSize)
for i := 0; i < batchSize; i++ {
Expand All @@ -1296,9 +1317,10 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
}
}
dmlEvents = append(dmlEvents, binlogEntry.DmlEvent)
if err := this.applier.ApplyDMLEventQueries(dmlEvents); err != nil {
return err
}
}
//this.migrationContext.Log.Infof("received dmlEvents: %+v", dmlEvents)
if err := this.applier.ApplyDMLEventQueries(dmlEvents); err != nil {
return err
}
}
}
Expand All @@ -1308,6 +1330,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
SequenceNumber: eventStruct.trxEvent.SequenceNumber,
Do: processTrxFunc,
}
this.migrationContext.Log.Infof("submitting job: %+v", job)
this.trxCoordinator.SubmitJob(job)

} else if eventStruct.dmlEvent != nil {
Expand Down
172 changes: 172 additions & 0 deletions go/logic/multi_threaded_applier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package logic

import (
"sync"
"fmt"
"time"
)

type Job struct {
// The sequence number of the job
SequenceNumber int64

// The sequence number of the job this job depends on
LastCommitted int64

// channel that's closed once the job's dependencies are met
// `nil` if the job's dependencies are already met when the job is submitted
waitChannel chan struct{}

// Do the job
Do func() error

// Data for the job
//Changes chan *replication.RowsEvent
}

type Coordinator struct {
// The queue of jobs to be executed
queue chan *Job

wg sync.WaitGroup

// List of workers
workers []*Worker

// Mutex to protect the fields below
mu sync.Mutex

// The low water mark. This is the sequence number of the last job that has been committed.
lowWaterMark int64

// This is a map of completed jobs by their sequence numbers.
// This is used when updating the low water mark.
completedJobs map[int64]bool

// These are the jobs that are waiting for a previous job to complete.
// They are indexed by the sequence number of the job they are waiting for.
waitingJobs map[int64][]*Job

// Is this the first job we're processing? If yes, we can schedule it even if the last committed
// sequence number is greater than the low water mark.
firstJob bool
}

type Worker struct {
executedJobs int
}

func NewCoordinator() *Coordinator {
return &Coordinator{
lowWaterMark: 0,
firstJob: true,
completedJobs: make(map[int64]bool),
waitingJobs: make(map[int64][]*Job),
queue: make(chan *Job),
}
}

func (c *Coordinator) StartWorkers(count int) {
jobTimeout := 10 * time.Second
for i := 0; i< count; i++ {
go func(){
for job := range c.queue {
if job.waitChannel != nil {
fmt.Printf("Coordinator: Job %d is waiting for job %d to complete\n", job.SequenceNumber, job.LastCommitted)
select {
case <-job.waitChannel:
break
case <- time.After(jobTimeout):
// TODO: something problably went wrong here
fmt.Printf("Coordinator: Job %d timed out waiting for job %d\n", job.SequenceNumber, job.LastCommitted)
panic("worker timeout")
}
// fmt.Printf("Worker received signal for job: %d\n", job.sequenceNumber)
}

if err := job.Do(); err != nil {
// TODO(meiji163) handle error
panic(err)
}
c.markJobCompleted(job)
}
}()
}
c.wg.Wait()
}

func (c *Coordinator) SubmitJob(job *Job) {
c.mu.Lock()

c.wg.Add(1)

// Jobs might need for their dependencies to be met before they can be executed. We use the
// `waitChannel` to signal that the job's dependencies are met.
//
// We can short-circuit this if:
// * this is the first job we're processing
// * the job's last committed sequence number is less than or equal to the low water mark,
// thus we know that the job's dependencies are already met.
// * the job's dependency has completed, but the lowWaterMark has not been updated yet because
// a job with a lower sequence number is still being processed.
//
// When short-circuiting, we don't assign a `waitChannel` to the job, thus signaling that the
// job's dependencies are already met.
if c.firstJob {
fmt.Printf("Coordinator: Scheduling first job: %d\n", job.SequenceNumber)
// assume everything before the first job has been processed
c.lowWaterMark = job.SequenceNumber
c.firstJob = false
} else if job.LastCommitted <= c.lowWaterMark || c.completedJobs[job.LastCommitted] {
fmt.Printf("Coordinator: Scheduling job: %d\n", job.SequenceNumber)
} else {
job.waitChannel = make(chan struct{})
c.waitingJobs[job.LastCommitted] = append(c.waitingJobs[job.LastCommitted], job)
}

c.mu.Unlock()
// Add the job to the queue. This will block until a worker picks up this job.
c.queue <- job
}

func (c *Coordinator) markJobCompleted(job *Job) {
c.mu.Lock()
defer c.wg.Done()

fmt.Printf("Coordinator: Marking job as completed: %d\n", job.SequenceNumber)

// Mark the job as completed
c.completedJobs[job.SequenceNumber] = true


// Then, update the low water mark if possible

// TODO: this won't work because the intermediate sequence numbers
// can be for trxs on tables other than the one we're migrating
for {
if c.completedJobs[c.lowWaterMark+1] {
c.lowWaterMark++
delete(c.completedJobs, c.lowWaterMark)
} else {
break
}
}

jobsToNotify := make([]*Job, 0)

// TODO: fix this
// Schedule any jobs that were waiting for this job to complete
for lastCommitted, jobs := range c.waitingJobs {
if lastCommitted <= c.lowWaterMark {
jobsToNotify = append(jobsToNotify, jobs...)
delete(c.waitingJobs, lastCommitted)
}
}

c.mu.Unlock()

for _, waitingJob := range jobsToNotify {
fmt.Printf("Scheduling previously waiting job: %d - %d\n", waitingJob.SequenceNumber, waitingJob.LastCommitted)
waitingJob.waitChannel <- struct{}{}
}
}
19 changes: 10 additions & 9 deletions go/logic/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ var (
)

type BinlogEventListener struct {
async bool
databaseName string
tableName string
onDmlEvent func(event *binlog.BinlogDMLEvent) error
onTrxEvent func(trx *binlog.Transaction) error
async bool
databaseName string
tableName string
onDmlEvent func(event *binlog.BinlogDMLEvent) error
onTrxEvent func(trx *binlog.Transaction) error
}

const (
Expand Down Expand Up @@ -208,11 +208,12 @@ func (evs *EventsStreamer) StreamTransactions(ctx context.Context, canStopStream
}
}()
for {
if canStopStreaming() {
return nil
}
// TODO: handle retry/reconnect
// TODO(meiji163): handle retry/reconnect
if err := evs.binlogReader.StreamTransactions(ctx, trxChan); err != nil {
if canStopStreaming() {
return nil
}
close(trxChan)
return err
}
}
Expand Down