From 82816c67e1b9b28dd034bd0f6b146775612861df Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 29 Jan 2021 18:32:33 +0100 Subject: [PATCH 1/8] Atomic upload for blobs A upload is now first saved to a temporary file before it gets renamed to the final filename. This ensures that incomplete uploads don't leave broken files behind (at least not under their real filename). In addition, removing failed uploads is no longer prone to a race condition with a retried upload. That scenario could occur when the first upload fails partway and the server doesn't notice that immediately. A later retry by restic will then delete the broken upload and upload the file again. If the server notices now that the initial upload has failed, then it will delete the correctly uploaded file. This has been fixed by only ever deleting the temporary file during upload. --- repo/repo.go | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/repo/repo.go b/repo/repo.go index 70e7693..fc8e908 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -542,7 +542,17 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { } path := h.getObjectPath(objectType, objectID) - tf, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_EXCL, h.opt.FileMode) + _, err := os.Stat(path) + if err == nil { + httpDefaultError(w, http.StatusForbidden) + return + } + if !os.IsNotExist(err) { + h.internalServerError(w, err) + return + } + + tf, err := ioutil.TempFile(filepath.Dir(path), ".rest-server-temp") if os.IsNotExist(err) { // the error is caused by a missing directory, create it and retry mkdirErr := os.MkdirAll(filepath.Dir(path), h.opt.DirMode) @@ -550,13 +560,9 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { log.Print(mkdirErr) } else { // try again - tf, err = os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_EXCL, h.opt.FileMode) + tf, err = ioutil.TempFile(filepath.Dir(path), ".rest-server-temp") } } - if os.IsExist(err) { - httpDefaultError(w, http.StatusForbidden) - return - } if err != nil { h.internalServerError(w, err) return @@ -590,7 +596,7 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { if err != nil { _ = tf.Close() - _ = os.Remove(path) + _ = os.Remove(tf.Name()) h.incrementRepoSpaceUsage(-written) if h.opt.Debug { log.Print(err) @@ -601,14 +607,21 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { if err := tf.Sync(); err != nil { _ = tf.Close() - _ = os.Remove(path) + _ = os.Remove(tf.Name()) h.incrementRepoSpaceUsage(-written) h.internalServerError(w, err) return } if err := tf.Close(); err != nil { - _ = os.Remove(path) + _ = os.Remove(tf.Name()) + h.incrementRepoSpaceUsage(-written) + h.internalServerError(w, err) + return + } + + if err := os.Rename(tf.Name(), path); err != nil { + _ = os.Remove(tf.Name()) h.incrementRepoSpaceUsage(-written) h.internalServerError(w, err) return From 2175029c9e6ba3a9ecc2a7e13ba8fb335e08c709 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 29 Jan 2021 19:02:22 +0100 Subject: [PATCH 2/8] Sync directory to disk after upload After a file was uploaded, also sync its containing directory to disk to make sure that also the directory entry is persisted after a system crash. --- repo/repo.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/repo/repo.go b/repo/repo.go index fc8e908..8fe1dae 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -627,9 +627,28 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { return } + if err := syncDir(filepath.Dir(path)); err != nil { + // Don't call os.Remove(path) as this is prone to race conditions with parallel upload retries + h.internalServerError(w, err) + return + } + h.sendMetric(objectType, BlobWrite, uint64(written)) } +func syncDir(dirname string) error { + dir, err := os.Open(dirname) + if err != nil { + return err + } + err = dir.Sync() + if err != nil { + _ = dir.Close() + return err + } + return dir.Close() +} + // deleteBlob deletes a blob from the repository. func (h *Handler) deleteBlob(w http.ResponseWriter, r *http.Request) { if h.opt.Debug { From ec0766cddda2c470611e27fe7aa186551724a672 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 16 Jul 2021 21:47:35 +0200 Subject: [PATCH 3/8] Don't sync directory on Windows Calling sync on a directory on Windows just returns "The handle is invalid" and fails. --- repo/repo.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/repo/repo.go b/repo/repo.go index 8fe1dae..8cad9bf 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "regexp" + "runtime" "strings" "time" @@ -637,6 +638,11 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { } func syncDir(dirname string) error { + if runtime.GOOS == "windows" { + // syncing a directory is not possible on windows + return nil + } + dir, err := os.Open(dirname) if err != nil { return err From 7fe16b69b2c89510259ea5af36ac01c2fdee25e0 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 31 Jan 2021 16:13:05 +0100 Subject: [PATCH 4/8] Mark helper functions --- handlers_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/handlers_test.go b/handlers_test.go index b9edc2b..72b82f5 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -66,6 +66,7 @@ func newRequest(t testing.TB, method, path string, body io.Reader) *http.Request // wantCode returns a function which checks that the response has the correct HTTP status code. func wantCode(code int) wantFunc { return func(t testing.TB, res *httptest.ResponseRecorder) { + t.Helper() if res.Code != code { t.Errorf("wrong response code, want %v, got %v", code, res.Code) } @@ -75,6 +76,7 @@ func wantCode(code int) wantFunc { // wantBody returns a function which checks that the response has the data in the body. func wantBody(body string) wantFunc { return func(t testing.TB, res *httptest.ResponseRecorder) { + t.Helper() if res.Body == nil { t.Errorf("body is nil, want %q", body) return @@ -88,6 +90,7 @@ func wantBody(body string) wantFunc { // checkRequest uses f to process the request and runs the checker functions on the result. func checkRequest(t testing.TB, f http.HandlerFunc, req *http.Request, want []wantFunc) { + t.Helper() rr := httptest.NewRecorder() f(rr, req) From e6cc79a2ecbaf375145e764183442656b4f96352 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 31 Jan 2021 16:13:24 +0100 Subject: [PATCH 5/8] Fix comment --- handlers_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/handlers_test.go b/handlers_test.go index 72b82f5..71fdaab 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -224,8 +224,8 @@ func TestResticHandler(t *testing.T) { {createOverwriteDeleteSeq(t, "/parent2/data/"+fileID, data)}, } - // setup rclone with a local backend in a temporary directory - tempdir, err := ioutil.TempDir("", "rclone-restic-test-") + // setup the server with a local backend in a temporary directory + tempdir, err := ioutil.TempDir("", "rest-server-test-") if err != nil { t.Fatal(err) } From 04d206303cd687eaed85a41406418d0243af072e Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 31 Jan 2021 16:15:57 +0100 Subject: [PATCH 6/8] Add test for race condition with aborted connection --- handlers_test.go | 120 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/handlers_test.go b/handlers_test.go index 71fdaab..3471a79 100644 --- a/handlers_test.go +++ b/handlers_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "crypto/sha256" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -14,6 +15,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "testing" ) @@ -327,3 +329,121 @@ func TestSplitURLPath(t *testing.T) { }) } } + +// delayErrorReader blocks until Continue is closed, closes the channel FirstRead and then returns Err. +type delayErrorReader struct { + FirstRead chan struct{} + firstReadOnce sync.Once + + Err error + + Continue chan struct{} +} + +func newDelayedErrorReader(err error) *delayErrorReader { + return &delayErrorReader{ + Err: err, + Continue: make(chan struct{}), + FirstRead: make(chan struct{}), + } +} + +func (d *delayErrorReader) Read(p []byte) (int, error) { + d.firstReadOnce.Do(func() { + // close the channel to signal that the first read has happened + close(d.FirstRead) + }) + <-d.Continue + return 0, d.Err +} + +// TestAbortedRequest runs tests with concurrent upload requests for the same file. +func TestAbortedRequest(t *testing.T) { + // setup the server with a local backend in a temporary directory + tempdir, err := ioutil.TempDir("", "rest-server-test-") + if err != nil { + t.Fatal(err) + } + + // make sure the tempdir is properly removed + defer func() { + err := os.RemoveAll(tempdir) + if err != nil { + t.Fatal(err) + } + }() + + // configure path, the race condition doesn't happen for append-only repositories + mux, err := NewHandler(&Server{ + AppendOnly: false, + Path: tempdir, + NoAuth: true, + Debug: true, + PanicOnError: true, + }) + if err != nil { + t.Fatalf("error from NewHandler: %v", err) + } + + // create the repo + checkRequest(t, mux.ServeHTTP, + newRequest(t, "POST", "/?create=true", nil), + []wantFunc{wantCode(http.StatusOK)}) + + var ( + id = "b5bb9d8014a0f9b1d61e21e796d78dccdf1352f23cd32812f4850b878ae4944c" + wg sync.WaitGroup + ) + + // the first request is an upload to a file which blocks while reading the + // body and then after some data returns an error + rd := newDelayedErrorReader(errors.New("injected")) + + wg.Add(1) + go func() { + defer wg.Done() + + // first, read some string, then read from rd (which blocks and then + // returns an error) + dataReader := io.MultiReader(strings.NewReader("invalid data from aborted request\n"), rd) + + t.Logf("start first upload") + req := newRequest(t, "POST", "/data/"+id, dataReader) + rr := httptest.NewRecorder() + mux.ServeHTTP(rr, req) + t.Logf("first upload done, response %v (%v)", rr.Code, rr.Result().Status) + }() + + // wait until the first request starts reading from the body + <-rd.FirstRead + + // then while the first request is blocked we send a second request to + // delete the file and a third request to upload to the file again, only + // then the first request is unblocked. + + t.Logf("delete file") + checkRequest(t, mux.ServeHTTP, + newRequest(t, "DELETE", "/data/"+id, nil), + nil) // don't check anything, restic also ignores errors here + + t.Logf("upload again") + checkRequest(t, mux.ServeHTTP, + newRequest(t, "POST", "/data/"+id, strings.NewReader("foo\n")), + []wantFunc{wantCode(http.StatusOK)}) + + // unblock the reader for the first request now so it can continue + close(rd.Continue) + + // wait for the first request to continue + wg.Wait() + + // request the file again, it must exist and contain the string from the + // second request + checkRequest(t, mux.ServeHTTP, + newRequest(t, "GET", "/data/"+id, nil), + []wantFunc{ + wantCode(http.StatusOK), + wantBody("foo\n"), + }, + ) +} From 28f569c0df7a43cf02b06ca27970d124d166bbd9 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 31 Jan 2021 17:25:30 +0100 Subject: [PATCH 7/8] Add changelog --- changelog/unreleased/pull-142 | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 changelog/unreleased/pull-142 diff --git a/changelog/unreleased/pull-142 b/changelog/unreleased/pull-142 new file mode 100644 index 0000000..af8e76c --- /dev/null +++ b/changelog/unreleased/pull-142 @@ -0,0 +1,14 @@ +Bugfix: Fix possible data loss due to interrupted network connections + +When rest-server was run without `--append-only` it was possible to lose uploaded +files in a specific scenario in which a network connection was interrupted. For the +data loss to occur a file upload by restic would have to be interrupted such that +restic notices the interrupted network connection before the rest-server. Then +restic would have to retry the file upload and finish it before the rest-server +notices that the initial upload has failed. Then the uploaded file would be +accidentally removed by rest-server when trying to cleanup the failed upload. + +This has been fixed by always uploading to a temporary file first which is moved +in position only once it was transfered completely. + +https://github.com/restic/rest-server/pull/142 From 64a43228de8056cf91f9d5dc4e3dd7351f36700f Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Thu, 12 Aug 2021 22:17:49 +0200 Subject: [PATCH 8/8] Prefix temporary file with object id --- repo/repo.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/repo/repo.go b/repo/repo.go index 8cad9bf..01fe1d8 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -553,7 +553,8 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { return } - tf, err := ioutil.TempFile(filepath.Dir(path), ".rest-server-temp") + tmpFn := objectID + ".rest-server-temp" + tf, err := ioutil.TempFile(filepath.Dir(path), tmpFn) if os.IsNotExist(err) { // the error is caused by a missing directory, create it and retry mkdirErr := os.MkdirAll(filepath.Dir(path), h.opt.DirMode) @@ -561,7 +562,7 @@ func (h *Handler) saveBlob(w http.ResponseWriter, r *http.Request) { log.Print(mkdirErr) } else { // try again - tf, err = ioutil.TempFile(filepath.Dir(path), ".rest-server-temp") + tf, err = ioutil.TempFile(filepath.Dir(path), tmpFn) } } if err != nil {