diff --git a/changelog/unreleased/pull-142 b/changelog/unreleased/pull-142 new file mode 100644 index 0000000..af8e76c --- /dev/null +++ b/changelog/unreleased/pull-142 @@ -0,0 +1,14 @@ +Bugfix: Fix possible data loss due to interrupted network connections + +When rest-server was run without `--append-only` it was possible to lose uploaded +files in a specific scenario in which a network connection was interrupted. For the +data loss to occur a file upload by restic would have to be interrupted such that +restic notices the interrupted network connection before the rest-server. Then +restic would have to retry the file upload and finish it before the rest-server +notices that the initial upload has failed. Then the uploaded file would be +accidentally removed by rest-server when trying to cleanup the failed upload. + +This has been fixed by always uploading to a temporary file first which is moved +in position only once it was transfered completely. + +https://github.com/restic/rest-server/pull/142 diff --git a/handlers_test.go b/handlers_test.go index b9edc2b..3471a79 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -14,6 +15,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "testing" ) @@ -66,6 +68,7 @@ func newRequest(t testing.TB, method, path string, body io.Reader) *http.Request // wantCode returns a function which checks that the response has the correct HTTP status code. func wantCode(code int) wantFunc { return func(t testing.TB, res *httptest.ResponseRecorder) { + t.Helper() if res.Code != code { t.Errorf("wrong response code, want %v, got %v", code, res.Code) } @@ -75,6 +78,7 @@ func wantCode(code int) wantFunc { // wantBody returns a function which checks that the response has the data in the body. func wantBody(body string) wantFunc { return func(t testing.TB, res *httptest.ResponseRecorder) { + t.Helper() if res.Body == nil { t.Errorf("body is nil, want %q", body) return @@ -88,6 +92,7 @@ func wantBody(body string) wantFunc { // checkRequest uses f to process the request and runs the checker functions on the result. func checkRequest(t testing.TB, f http.HandlerFunc, req *http.Request, want []wantFunc) { + t.Helper() rr := httptest.NewRecorder() f(rr, req) @@ -221,8 +226,8 @@ func TestResticHandler(t *testing.T) { {createOverwriteDeleteSeq(t, "/parent2/data/"+fileID, data)}, } - // setup rclone with a local backend in a temporary directory - tempdir, err := ioutil.TempDir("", "rclone-restic-test-") + // setup the server with a local backend in a temporary directory + tempdir, err := ioutil.TempDir("", "rest-server-test-") if err != nil { t.Fatal(err) } @@ -324,3 +329,121 @@ func TestSplitURLPath(t *testing.T) { }) } } + +// delayErrorReader blocks until Continue is closed, closes the channel FirstRead and then returns Err. +type delayErrorReader struct { + FirstRead chan struct{} + firstReadOnce sync.Once + + Err error + + Continue chan struct{} +} + +func newDelayedErrorReader(err error) *delayErrorReader { + return &delayErrorReader{ + Err: err, + Continue: make(chan struct{}), + FirstRead: make(chan struct{}), + } +} + +func (d *delayErrorReader) Read(p []byte) (int, error) { + d.firstReadOnce.Do(func() { + // close the channel to signal that the first read has happened + close(d.FirstRead) + }) + <-d.Continue + return 0, d.Err +} + +// TestAbortedRequest runs tests with concurrent upload requests for the same file. +func TestAbortedRequest(t *testing.T) { + // setup the server with a local backend in a temporary directory + tempdir, err := ioutil.TempDir("", "rest-server-test-") + if err != nil { + t.Fatal(err) + } + + // make sure the tempdir is properly removed + defer func() { + err := os.RemoveAll(tempdir) + if err != nil { + t.Fatal(err) + } + }() + + // configure path, the race condition doesn't happen for append-only repositories + mux, err := NewHandler(&Server{ + AppendOnly: false, + Path: tempdir, + NoAuth: true, + Debug: true, + PanicOnError: true, + }) + if err != nil { + t.Fatalf("error from NewHandler: %v", err) + } + + // create the repo + checkRequest(t, mux.ServeHTTP, + newRequest(t, "POST", "/?create=true", nil), + []wantFunc{wantCode(http.StatusOK)}) + + var ( + id = "b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c" + wg sync.WaitGroup + ) + + // the first request is an upload to a file which blocks while reading the + // body and then after some data returns an error + rd := newDelayedErrorReader(errors.New("injected")) + + wg.Add(1) + go func() { + defer wg.Done() + + // first, read some string, then read from rd (which blocks and then + // returns an error) + dataReader := io.MultiReader(strings.NewReader("invalid data from aborted request\n"), rd) + + t.Logf("start first upload") + req := newRequest(t, "POST", "/data/"+id, dataReader) + rr := httptest.NewRecorder() + mux.ServeHTTP(rr, req) + t.Logf("first upload done, response %v (%v)", rr.Code, rr.Result().Status) + }() + + // wait until the first request starts reading from the body + <-rd.FirstRead + + // then while the first request is blocked we send a second request to + // delete the file and a third request to upload to the file again, only + // then the first request is unblocked. + + t.Logf("delete file") + checkRequest(t, mux.ServeHTTP, + newRequest(t, "DELETE", "/data/"+id, nil), + nil) // don't check anything, restic also ignores errors here + + t.Logf("upload again") + checkRequest(t, mux.ServeHTTP, + newRequest(t, "POST", "/data/"+id, strings.NewReader("foo\n")), + []wantFunc{wantCode(http.StatusOK)}) + + // unblock the reader for the first request now so it can continue + close(rd.Continue) + + // wait for the first request to continue + wg.Wait() + + // request the file again, it must exist and contain the string from the + // second request + checkRequest(t, mux.ServeHTTP, + newRequest(t, "GET", "/data/"+id, nil), + []wantFunc{ + wantCode(http.StatusOK), + wantBody("foo\n"), + }, + ) +} diff --git a/repo/repo.go b/repo/repo.go index 70e7693..01fe1d8 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "strings" "time" @@ -542,7 +543,18 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { } path := h.getObjectPath(objectType, objectID) - tf, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_EXCL, h.opt.FileMode) + _, err := os.Stat(path) + if err == nil { + httpDefaultError(w, http.StatusForbidden) + return + } + if !os.IsNotExist(err) { + h.internalServerError(w, err) + return + } + + tmpFn := objectID + ".rest-server-temp" + tf, err := ioutil.TempFile(filepath.Dir(path), tmpFn) if os.IsNotExist(err) { // the error is caused by a missing directory, create it and retry mkdirErr := os.MkdirAll(filepath.Dir(path), h.opt.DirMode) @@ -550,13 +562,9 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { log.Print(mkdirErr) } else { // try again - tf, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_EXCL, h.opt.FileMode) + tf, err = ioutil.TempFile(filepath.Dir(path), tmpFn) } } - if os.IsExist(err) { - httpDefaultError(w, http.StatusForbidden) - return - } if err != nil { h.internalServerError(w, err) return @@ -590,7 +598,7 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { if err != nil { _ = tf.Close() - _ = os.Remove(path) + _ = os.Remove(tf.Name()) h.incrementRepoSpaceUsage(-written) if h.opt.Debug { log.Print(err) @@ -601,22 +609,53 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { if err := tf.Sync(); err != nil { _ = tf.Close() - _ = os.Remove(path) + _ = os.Remove(tf.Name()) h.incrementRepoSpaceUsage(-written) h.internalServerError(w, err) return } if err := tf.Close(); err != nil { - _ = os.Remove(path) + _ = os.Remove(tf.Name()) h.incrementRepoSpaceUsage(-written) h.internalServerError(w, err) return } + if err := os.Rename(tf.Name(), path); err != nil { + _ = os.Remove(tf.Name()) + h.incrementRepoSpaceUsage(-written) + h.internalServerError(w, err) + return + } + + if err := syncDir(filepath.Dir(path)); err != nil { + // Don't call os.Remove(path) as this is prone to race conditions with parallel upload retries + h.internalServerError(w, err) + return + } + h.sendMetric(objectType, BlobWrite, uint64(written)) } +func syncDir(dirname string) error { + if runtime.GOOS == "windows" { + // syncing a directory is not possible on windows + return nil + } + + dir, err := os.Open(dirname) + if err != nil { + return err + } + err = dir.Sync() + if err != nil { + _ = dir.Close() + return err + } + return dir.Close() +} + // deleteBlob deletes a blob from the repository. func (h *Handler) deleteBlob(w http.ResponseWriter, r *http.Request) { if h.opt.Debug {