Skip to content

Commit

Permalink
Stabilize async uploads (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir authored Jun 10, 2023
1 parent 737448a commit 2c9b28c
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 26 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
to replace it.
* *(client)* Added `MoveInviteState` sync handler for moving state events in
the invite section of sync inside the invite event itself.
* *(client)* Stabilized support for asynchronous uploads.
* `UnstableCreateMXC` and `UnstableUploadAsync` were renamed to `CreateMXC`
and `UploadAsync` respectively.

## v0.15.2 (2023-05-16)

Expand Down
57 changes: 33 additions & 24 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1413,10 +1413,11 @@ func (cli *Client) DownloadBytesContext(ctx context.Context, mxcURL id.ContentUR
return io.ReadAll(resp.Body)
}

// UnstableCreateMXC creates a blank Matrix content URI to allow uploading the content asynchronously later.
// See /~https://github.com/matrix-org/matrix-spec-proposals/pull/2246
func (cli *Client) UnstableCreateMXC() (*RespCreateMXC, error) {
u, _ := url.Parse(cli.BuildURL(MediaURLPath{"unstable", "fi.mau.msc2246", "create"}))
// CreateMXC creates a blank Matrix content URI to allow uploading the content asynchronously later.
//
// See https://spec.matrix.org/v1.7/client-server-api/#post_matrixmediav1create
func (cli *Client) CreateMXC() (*RespCreateMXC, error) {
u, _ := url.Parse(cli.BuildURL(MediaURLPath{"v1", "create"}))
var m RespCreateMXC
_, err := cli.MakeFullRequest(FullRequest{
Method: http.MethodPost,
Expand All @@ -1426,19 +1427,22 @@ func (cli *Client) UnstableCreateMXC() (*RespCreateMXC, error) {
return &m, err
}

// UnstableUploadAsync creates a blank content URI with UnstableCreateMXC, starts uploading the data in the background
// and returns the created MXC immediately. See /~https://github.com/matrix-org/matrix-spec-proposals/pull/2246 for more info.
func (cli *Client) UnstableUploadAsync(req ReqUploadMedia) (*RespCreateMXC, error) {
resp, err := cli.UnstableCreateMXC()
// UploadAsync creates a blank content URI with CreateMXC, starts uploading the data in the background
// and returns the created MXC immediately.
//
// See https://spec.matrix.org/v1.7/client-server-api/#post_matrixmediav1create
// and https://spec.matrix.org/v1.7/client-server-api/#put_matrixmediav3uploadservernamemediaid
func (cli *Client) UploadAsync(req ReqUploadMedia) (*RespCreateMXC, error) {
resp, err := cli.CreateMXC()
if err != nil {
return nil, err
}
req.UnstableMXC = resp.ContentURI
req.UploadURL = resp.UploadURL
req.MXC = resp.ContentURI
req.UnstableUploadURL = resp.UnstableUploadURL
go func() {
_, err = cli.UploadMedia(req)
if err != nil {
cli.Log.Error().Str("mxc", req.UnstableMXC.String()).Err(err).Msg("Async upload of media failed")
cli.Log.Error().Str("mxc", req.MXC.String()).Err(err).Msg("Async upload of media failed")
}
}()
return resp, nil
Expand Down Expand Up @@ -1474,13 +1478,13 @@ type ReqUploadMedia struct {
ContentType string
FileName string

// UnstableMXC specifies an existing MXC URI which doesn't have content yet to upload into.
// See https://github.com/matrix-org/matrix-spec-proposals/pull/2246 for more info.
UnstableMXC id.ContentURI
// MXC specifies an existing MXC URI which doesn't have content yet to upload into.
// See https://spec.matrix.org/unstable/client-server-api/#put_matrixmediav3uploadservernamemediaid
MXC id.ContentURI

// UploadURL specifies the URL to upload the content to (MSC3870)
// UnstableUploadURL specifies the URL to upload the content to. MXC must also be set.
// see /~https://github.com/matrix-org/matrix-spec-proposals/pull/3870 for more info
UploadURL string
UnstableUploadURL string
}

func (cli *Client) tryUploadMediaToURL(url, contentType string, content io.Reader) (*http.Response, error) {
Expand Down Expand Up @@ -1508,7 +1512,7 @@ func (cli *Client) uploadMediaToURL(data ReqUploadMedia) (*RespMediaUpload, erro
} else {
data.Content = nil
}
resp, err := cli.tryUploadMediaToURL(data.UploadURL, data.ContentType, reader)
resp, err := cli.tryUploadMediaToURL(data.UnstableUploadURL, data.ContentType, reader)
if err == nil {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
// Everything is fine
Expand All @@ -1517,10 +1521,12 @@ func (cli *Client) uploadMediaToURL(data ReqUploadMedia) (*RespMediaUpload, erro
err = fmt.Errorf("HTTP %d", resp.StatusCode)
}
if retries <= 0 {
cli.Log.Warn().Str("url", data.UploadURL).Err(err).Msg("Error uploading media to external URL, not retrying")
cli.Log.Warn().Str("url", data.UnstableUploadURL).Err(err).
Msg("Error uploading media to external URL, not retrying")
return nil, err
}
cli.Log.Warn().Str("url", data.UploadURL).Err(err).Msg("Error uploading media to external URL, retrying")
cli.Log.Warn().Str("url", data.UnstableUploadURL).Err(err).
Msg("Error uploading media to external URL, retrying")
retries--
}

Expand All @@ -1529,7 +1535,7 @@ func (cli *Client) uploadMediaToURL(data ReqUploadMedia) (*RespMediaUpload, erro
query["filename"] = data.FileName
}

notifyURL := cli.BuildURLWithQuery(MediaURLPath{"unstable", "fi.mau.msc2246", "upload", data.UnstableMXC.Homeserver, data.UnstableMXC.FileID, "complete"}, query)
notifyURL := cli.BuildURLWithQuery(MediaURLPath{"unstable", "com.beeper.msc3870", "upload", data.MXC.Homeserver, data.MXC.FileID, "complete"}, query)

var m *RespMediaUpload
_, err := cli.MakeFullRequest(FullRequest{
Expand All @@ -1545,15 +1551,18 @@ func (cli *Client) uploadMediaToURL(data ReqUploadMedia) (*RespMediaUpload, erro
}

// UploadMedia uploads the given data to the content repository and returns an MXC URI.
// See https://spec.matrix.org/v1.2/client-server-api/#post_matrixmediav3upload
// See https://spec.matrix.org/v1.7/client-server-api/#post_matrixmediav3upload
func (cli *Client) UploadMedia(data ReqUploadMedia) (*RespMediaUpload, error) {
if data.UploadURL != "" {
if data.UnstableUploadURL != "" {
if data.MXC.IsEmpty() {
return nil, errors.New("MXC must also be set when uploading to external URL")
}
return cli.uploadMediaToURL(data)
}
u, _ := url.Parse(cli.BuildURL(MediaURLPath{"v3", "upload"}))
method := http.MethodPost
if !data.UnstableMXC.IsEmpty() {
u, _ = url.Parse(cli.BuildURL(MediaURLPath{"unstable", "fi.mau.msc2246", "upload", data.UnstableMXC.Homeserver, data.UnstableMXC.FileID}))
if !data.MXC.IsEmpty() {
u, _ = url.Parse(cli.BuildURL(MediaURLPath{"v3", "upload", data.MXC.Homeserver, data.MXC.FileID}))
method = http.MethodPut
}
if len(data.FileName) > 0 {
Expand Down
5 changes: 3 additions & 2 deletions responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,12 @@ type RespMediaUpload struct {
ContentURI id.ContentURI `json:"content_uri"`
}

// RespCreateMXC is the JSON response for /_matrix/media/v3/create as specified in https://github.com/matrix-org/matrix-spec-proposals/pull/2246
// RespCreateMXC is the JSON response for https://spec.matrix.org/v1.7/client-server-api/#post_matrixmediav1create
type RespCreateMXC struct {
ContentURI id.ContentURI `json:"content_uri"`
UnusedExpiresAt int `json:"unused_expires_at,omitempty"`
UploadURL string `json:"upload_url,omitempty"`

UnstableUploadURL string `json:"com.beeper.msc3870.upload_url,omitempty"`
}

// RespPreviewURL is the JSON response for https://spec.matrix.org/v1.2/client-server-api/#get_matrixmediav3preview_url
Expand Down

0 comments on commit 2c9b28c

Please sign in to comment.