Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded replication WIP #1454

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
37c1abd
WIP.
arthurschreiber Sep 30, 2024
5d69fd9
Fixes.
arthurschreiber Sep 30, 2024
df29aa8
Fixups.
arthurschreiber Sep 30, 2024
81a6e6e
Special handling for first job.
arthurschreiber Sep 30, 2024
b44c6df
Fix deadlocks.
arthurschreiber Sep 30, 2024
55ce5a2
Update docs and simulate changes.
arthurschreiber Sep 30, 2024
0d8f39b
start StreamTransaction implementation
meiji163 Oct 3, 2024
22a4b8c
add job Coordinator
meiji163 Oct 3, 2024
3fd6e71
fix job complete logic
meiji163 Oct 3, 2024
f209592
add docker compose
meiji163 Oct 4, 2024
c6c877e
fix application of applyEvent.writeFunc
meiji163 Oct 4, 2024
b031166
Send off transaction events as soon as we see a GTID event.
arthurschreiber Oct 7, 2024
f81a790
WIP.
arthurschreiber Oct 8, 2024
e3b2cda
WIP.
arthurschreiber Oct 9, 2024
3ba9058
fix MarkTransactionCompleted
meiji163 Oct 10, 2024
74f6c9c
go mod tidy
meiji163 Oct 10, 2024
eddc1c9
configure max idle connections
meiji163 Oct 10, 2024
6321e73
vendor packages
meiji163 Oct 10, 2024
5600b91
track binlog coords
meiji163 Oct 10, 2024
3f47ebd
binlog streamer reconnect
meiji163 Oct 16, 2024
e81aabf
fix TestMigrate
meiji163 Oct 16, 2024
e4da5f8
worker-stats command
meiji163 Oct 17, 2024
c5e239c
setup mysql in CI
meiji163 Oct 17, 2024
2e78f6f
set transaction_write_set_extraction in test
meiji163 Oct 17, 2024
875d00d
add ci mysql opt flags
meiji163 Oct 17, 2024
9a022e2
Revert "set transaction_write_set_extraction in test"
meiji163 Oct 17, 2024
18dcee1
change ci mysql opts
meiji163 Oct 17, 2024
299df37
try custom docker run
meiji163 Oct 17, 2024
941689f
Make the linter happy.
arthurschreiber Oct 21, 2024
9e3bc1c
Use testcontainers.
arthurschreiber Oct 21, 2024
113e674
WIP.
arthurschreiber Oct 21, 2024
8e38b86
Merge branch 'master' of /~https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
d7ccab9
Merge branch 'master' of /~https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 21, 2024
06c7082
fix 2 tests, TestMigrate still not working
meiji163 Oct 21, 2024
d7dc97b
Fix applier tests.
arthurschreiber Oct 21, 2024
bcc7e7a
Fix migrator tests.
arthurschreiber Oct 21, 2024
b747d98
Add error assertions.
arthurschreiber Oct 21, 2024
1a5be0b
🔥
arthurschreiber Oct 21, 2024
1942455
Fix applier connection pool size.
arthurschreiber Oct 22, 2024
85cab4d
Merge branch 'master' into meiji163/parallel-repl
arthurschreiber Oct 23, 2024
2ba0cb2
Fix merge conflict.
arthurschreiber Oct 23, 2024
b82e8f9
Prepare queries.
arthurschreiber Oct 23, 2024
fa7c484
pass throttler to Coordinator
meiji163 Oct 24, 2024
126c981
track time waiting on event channels
meiji163 Oct 24, 2024
641fe92
add flag for number of Coordinator workers
meiji163 Oct 25, 2024
615d1df
Merge branch 'master' of /~https://github.com/github/gh-ost into meiji1…
arthurschreiber Oct 25, 2024
0a8787e
Fix test case.
arthurschreiber Oct 25, 2024
6aaa374
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
f6ec835
remove unused streamer
meiji163 Nov 18, 2024
0555d72
fix coordinator test
meiji163 Nov 18, 2024
569d035
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 18, 2024
9b5e2bc
fix Coordinator test
meiji163 Nov 19, 2024
6689aeb
fix migrator test
meiji163 Nov 19, 2024
53e953d
linter fix
meiji163 Nov 19, 2024
e1ca9cd
Merge branch 'master' into meiji163/parallel-repl
meiji163 Nov 22, 2024
b34f2e2
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
adfcdf7
Merge branch 'master' into meiji163/parallel-repl
meiji163 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Special handling for first job.
  • Loading branch information
arthurschreiber committed Sep 30, 2024
commit 81a6e6e72dbf9ffe0aa92ba81ed84b11d061d57c
36 changes: 27 additions & 9 deletions go/logic/multi_threaded_applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ type Coordinator struct {

queue chan *Job

wg sync.WaitGroup

mu sync.Mutex
completedJobs map[int]bool
waitingJobs map[int][]*Job
mu sync.Mutex

wg sync.WaitGroup
firstJob bool

workers []*Worker
}
Expand All @@ -37,6 +38,7 @@ type Worker struct {
func NewCoordinator() *Coordinator {
return &Coordinator{
lowWaterMark: 0,
firstJob: true,
completedJobs: make(map[int]bool),
waitingJobs: make(map[int][]*Job),
queue: make(chan *Job, 100),
Expand Down Expand Up @@ -66,13 +68,14 @@ func (c *Coordinator) SubmitJob(job *Job) {
// If the job is ready to be scheduled, schedule it.
//
// A job can be scheduled if:
// * This is the very first job.
// * The last committed job is less than or equal to the low water mark. This
// means that all jobs up to the low water mark have been completed.
// * The job has no dependencies (i.e. it is the first job in the binlog).
// * The low water mark is 0 and the queue is empty. This means that
// this is the first job we received, but the job is coming from somewhere
// in the middle of the binlog.
if job.lastCommitted <= c.lowWaterMark || job.sequenceNumber == 0 || (c.lowWaterMark == 0 && len(c.queue) == 0) {
if c.firstJob {
fmt.Printf("Scheduling first job: %d\n", job.sequenceNumber)
c.queue <- job
c.firstJob = false
} else if job.lastCommitted <= c.lowWaterMark {
fmt.Printf("Scheduling job: %d\n", job.sequenceNumber)
c.queue <- job
} else {
Expand Down Expand Up @@ -117,7 +120,7 @@ func (c *Coordinator) markJobCompleted(job *Job) {
}

func (w *Worker) processJob(job *Job) error {
// sleep random time between 1 and 200 ms to simulate work
// sleep random time between 1 and 100 ms to simulate work
time.Sleep(time.Duration(rand.Intn(100)+1) * time.Millisecond)

w.executedJobs++
Expand Down Expand Up @@ -155,6 +158,21 @@ func TestMultiThreadedApplierWithDependentJobs(t *testing.T) {
}
}

func TestMultiThreadedApplierWithManyDependentJobs(t *testing.T) {
coordinator := NewCoordinator()
coordinator.StartWorkers(16)

for i := 1; i < 101; i++ {
coordinator.SubmitJob(&Job{sequenceNumber: i, lastCommitted: 1})
}

coordinator.wg.Wait()

for i, w := range coordinator.workers {
fmt.Printf("Worker %d executed %d jobs\n", i, w.executedJobs)
}
}

func TestMultiThreadedApplierWithVaryingDependentJobs(t *testing.T) {
coordinator := NewCoordinator()
coordinator.StartWorkers(16)
Expand Down