Skip to content

Commit

Permalink
Fix inconsistent state between WAL and saved Snapshot, and add the un…
Browse files Browse the repository at this point in the history
…it test to reproduce the problem.
  • Loading branch information
zghh committed Aug 11, 2022
1 parent 4a347c4 commit 3bafa7b
Showing 1 changed file with 79 additions and 0 deletions.
79 changes: 79 additions & 0 deletions orderer/consensus/etcdraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,3 +395,82 @@ func TestApplyOutOfDateSnapshot(t *testing.T) {
assertFileCount(t, 12, 1)
})
}

func TestAbortWhenWritingSnapshot(t *testing.T) {
t.Run("Abort when writing snapshot", func(t *testing.T) {
setup(t)
defer clean(t)

// set SegmentSizeBytes to a small value so that
// every entry persisted to wal would result in
// a new wal being created.
oldSegmentSizeBytes := wal.SegmentSizeBytes
wal.SegmentSizeBytes = 10
defer func() {
wal.SegmentSizeBytes = oldSegmentSizeBytes
}()

// create 5 new entry
for i := 0; i < 5; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
assertFileCount(t, 6, 0)

// Assume an orderer missed some records due to exceptions and receives a new snapshot from other orderers.
commit := 10
store.Store(
[]raftpb.Entry{},
raftpb.HardState{Commit: uint64(commit)},
raftpb.Snapshot{
Metadata: raftpb.SnapshotMetadata{
Index: uint64(commit),
},
Data: make([]byte, 100),
},
)
err = store.Close()
assert.NoError(t, err)

// In old logic, it will use rs.wal.Save(hardstate, entries) to save the state firstly, so we remove the snapshot files.
// sd, err := os.Open(snapDir)
// assert.NoError(t, err)
// defer sd.Close()
// names, err := sd.Readdirnames(-1)
// assert.NoError(t, err)
// sort.Sort(sort.Reverse(sort.StringSlice(names)))
// os.Remove(filepath.Join(snapDir, names[0]))
// wd, err := os.Open(walDir)
// assert.NoError(t, err)
// defer wd.Close()
// names, err = wd.Readdirnames(-1)
// assert.NoError(t, err)
// sort.Sort(sort.Reverse(sort.StringSlice(names)))
// os.Remove(filepath.Join(walDir, names[0]))

// But in the new logic, it will use rs.saveSnap(snapshot) to save the snapshot firstly, so we remove the WAL files.
wd, err := os.Open(walDir)
assert.NoError(t, err)
defer wd.Close()
names, err := wd.Readdirnames(-1)
assert.NoError(t, err)
sort.Sort(sort.Reverse(sort.StringSlice(names)))
os.Remove(filepath.Join(walDir, names[0]))

// Then restart the orderer.
ram := raft.NewMemoryStorage()
store, err = CreateStorage(logger, walDir, snapDir, ram)
assert.NoError(t, err)

// Check the state from go.etcd.io/etcd/raft/raft.go
// func (r *raft) loadState(state pb.HardState)
hd, _, err := store.ram.InitialState()
assert.NoError(t, err)
lastIndex, err := store.ram.LastIndex()
assert.NoError(t, err)
assert.False(t, hd.Commit > lastIndex)
})
}

0 comments on commit 3bafa7b

Please sign in to comment.