Skip to content

Commit

Permalink
Databases: Support online-migrations (#776)
Browse files Browse the repository at this point in the history
* Databases: Support online-migrations

* Delete .vscode/launch.json

* a -> an

* doc fix

* add omitempty

---------

Co-authored-by: Ben Tranter <ben@bentranter.io>
  • Loading branch information
danaelhe and bentranter authored Jan 28, 2025
1 parent 975870d commit 0d0064e
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 0 deletions.
75 changes: 75 additions & 0 deletions databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
databaseIndexPath = databaseBasePath + "/%s/indexes/%s"
databaseLogsinkPath = databaseBasePath + "/%s/logsink/%s"
databaseLogsinksPath = databaseBasePath + "/%s/logsink"
databaseOnlineMigrationsPath = databaseBasePath + "/%s/online-migration"
databaseOnlineMigrationPath = databaseBasePath + "/%s/online-migration/%s"
)

// SQL Mode constants allow for MySQL-specific SQL flavor configuration.
Expand Down Expand Up @@ -179,6 +181,9 @@ type DatabasesService interface {
ListLogsinks(ctx context.Context, databaseID string, opts *ListOptions) ([]DatabaseLogsink, *Response, error)
UpdateLogsink(ctx context.Context, databaseID string, logsinkID string, updateLogsink *DatabaseUpdateLogsinkRequest) (*Response, error)
DeleteLogsink(ctx context.Context, databaseID, logsinkID string) (*Response, error)
StartOnlineMigration(ctx context.Context, databaseID string, onlineMigrationRequest *DatabaseStartOnlineMigrationRequest) (*DatabaseOnlineMigrationStatus, *Response, error)
StopOnlineMigration(ctx context.Context, databaseID, migrationID string) (*Response, error)
GetOnlineMigrationStatus(ctx context.Context, databaseID string) (*DatabaseOnlineMigrationStatus, *Response, error)
}

// DatabasesServiceOp handles communication with the Databases related methods
Expand Down Expand Up @@ -366,6 +371,13 @@ type DatabaseLogsink struct {
Config *DatabaseLogsinkConfig `json:"config,omitempty"`
}

// DatabaseOnlineMigrationStatus represents an online migration status
type DatabaseOnlineMigrationStatus struct {
ID string `json:"id"`
Status string `json:"status"`
CreatedAt string `json:"created_at"`
}

// TopicPartition represents the state of a Kafka topic partition
type TopicPartition struct {
EarliestOffset uint64 `json:"earliest_offset,omitempty"`
Expand Down Expand Up @@ -515,6 +527,13 @@ type DatabaseFirewallRule struct {
CreatedAt time.Time `json:"created_at"`
}

// DatabaseStartOnlineMigrationRequest is used to start an online migration for a database cluster
type DatabaseStartOnlineMigrationRequest struct {
Source *DatabaseOnlineMigrationConfig `json:"source"`
DisableSSL bool `json:"disable_ssl,omitempty"`
IgnoreDBs []string `json:"ignore_dbs,omitempty"`
}

// DatabaseCreateLogsinkRequest is used to create logsink for a database cluster
type DatabaseCreateLogsinkRequest struct {
Name string `json:"sink_name"`
Expand Down Expand Up @@ -544,6 +563,15 @@ type DatabaseLogsinkConfig struct {
Cert string `json:"cert,omitempty"`
}

// DatabaseOnlineMigrationConfig represents the configuration options for database online migrations.
type DatabaseOnlineMigrationConfig struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
DatabaseName string `json:"dbname,omitempty"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
}

// PostgreSQLConfig holds advanced configurations for PostgreSQL database clusters.
type PostgreSQLConfig struct {
AutovacuumFreezeMaxAge *int `json:"autovacuum_freeze_max_age,omitempty"`
Expand Down Expand Up @@ -1975,3 +2003,50 @@ func (svc *DatabasesServiceOp) DeleteLogsink(ctx context.Context, databaseID, lo
}
return resp, nil
}

// StartOnlineMigration starts an online migration for a database. Migrating a cluster establishes a connection with an existing cluster
// and replicates its contents to the target cluster. Online migration is only available for MySQL, PostgreSQL, and Redis clusters.
func (svc *DatabasesServiceOp) StartOnlineMigration(ctx context.Context, databaseID string, onlineMigration *DatabaseStartOnlineMigrationRequest) (*DatabaseOnlineMigrationStatus, *Response, error) {
path := fmt.Sprintf(databaseOnlineMigrationsPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodPut, path, onlineMigration)
if err != nil {
return nil, nil, err
}

root := new(DatabaseOnlineMigrationStatus)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// GetOnlineMigrationStatus retrieves the status of the most recent online migration
func (svc *DatabasesServiceOp) GetOnlineMigrationStatus(ctx context.Context, databaseID string) (*DatabaseOnlineMigrationStatus, *Response, error) {
path := fmt.Sprintf(databaseOnlineMigrationsPath, databaseID)
req, err := svc.client.NewRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, nil, err
}

root := new(DatabaseOnlineMigrationStatus)
resp, err := svc.client.Do(ctx, req, root)
if err != nil {
return nil, resp, err
}
return root, resp, nil
}

// StopOnlineMigration stops an online migration
func (svc *DatabasesServiceOp) StopOnlineMigration(ctx context.Context, databaseID, migrationID string) (*Response, error) {
path := fmt.Sprintf(databaseOnlineMigrationPath, databaseID, migrationID)
req, err := svc.client.NewRequest(ctx, http.MethodDelete, path, nil)
if err != nil {
return nil, err
}
resp, err := svc.client.Do(ctx, req, nil)
if err != nil {
return resp, err
}
return resp, nil
}
100 changes: 100 additions & 0 deletions databases_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4175,3 +4175,103 @@ func TestDatabases_DeleteLogsink(t *testing.T) {
_, err := client.Databases.DeleteLogsink(ctx, dbID, logsinkID)
require.NoError(t, err)
}

func TestDatabases_StartOnlineMigration(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

body := `{
"source": {
"host": "source-do-user-6607903-0.b.db.ondigitalocean.com",
"dbname": "defaultdb",
"port": 25060,
"username": "doadmin",
"password": "paakjnfe10rsrsmf"
},
"disable_ssl": false,
"ignore_dbs": [
"db0",
"db1"
]
}`

path := fmt.Sprintf("/v2/databases/%s/online-migration", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut)
fmt.Fprint(w, body)
})

_, resp, err := client.Databases.StartOnlineMigration(ctx, dbID, &DatabaseStartOnlineMigrationRequest{
DisableSSL: false,

Source: &DatabaseOnlineMigrationConfig{
Host: "https://user:passwd@192.168.0.1:25060",
DatabaseName: "defaultdb",
Port: 25060,
Username: "doadmin",
Password: "paakjnfe10rsrsmf",
},
})

require.NoError(t, err)

require.Equal(t, 200, resp.StatusCode)
}

func TestDatabases_GetOnlineMigrationStatus(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
)

body := `{
"source": {
"host": "source-do-user-6607903-0.b.db.ondigitalocean.com",
"dbname": "defaultdb",
"port": 25060,
"username": "doadmin",
"password": "paakjnfe10rsrsmf"
},
"disable_ssl": false,
"ignore_dbs": [
"db0",
"db1"
]
}`
path := fmt.Sprintf("/v2/databases/%s/online-migration", dbID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, body)
})

_, resp, err := client.Databases.GetOnlineMigrationStatus(ctx, dbID)
require.NoError(t, err)
require.Equal(t, 200, resp.StatusCode)
}

func TestDatabases_StopOnlineMigration(t *testing.T) {
setup()
defer teardown()

var (
dbID = "deadbeef-dead-4aa5-beef-deadbeef347d"
migrationID = "50484ec3-19d6-4cd3-b56f-3b0381c289a6"
)

path := fmt.Sprintf("/v2/databases/%s/online-migration/%s", dbID, migrationID)

mux.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodDelete)
})

_, err := client.Databases.StopOnlineMigration(ctx, dbID, migrationID)
require.NoError(t, err)
}

0 comments on commit 0d0064e

Please sign in to comment.