Skip to content

Commit

Permalink
Fix inconsistent state between WAL and saved Snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
zghh committed Aug 11, 2022
1 parent bcc7758 commit d8be2fc
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 41 deletions.
97 changes: 61 additions & 36 deletions orderer/consensus/etcdraft/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,22 +75,9 @@ func CreateStorage(
return nil, err
}

snapshot, err := sn.Load()
snapshot, w, st, ents, err := loadNewestAvailableSnapshot(lg, walDir, snapDir)
if err != nil {
if err == snap.ErrNoSnapshot {
lg.Debugf("No snapshot found at %s", snapDir)
} else {
return nil, errors.Errorf("failed to load snapshot: %s", err)
}
} else {
// snapshot found
lg.Debugf("Loaded snapshot at Term %d and Index %d, Nodes: %+v",
snapshot.Metadata.Term, snapshot.Metadata.Index, snapshot.Metadata.ConfState.Nodes)
}

w, st, ents, err := createOrReadWAL(lg, walDir, snapshot)
if err != nil {
return nil, errors.Errorf("failed to create or read WAL: %s", err)
return nil, errors.Errorf("Failed to load snapshot and WAL: %s", err)
}

if snapshot != nil {
Expand Down Expand Up @@ -120,26 +107,11 @@ func CreateStorage(
// ListSnapshots returns a list of RaftIndex of snapshots stored on disk.
// If a file is corrupted, rename the file.
func ListSnapshots(logger *flogging.FabricLogger, snapDir string) []uint64 {
dir, err := os.Open(snapDir)
snapfiles, err := listSnapshotFiles(logger, snapDir)
if err != nil {
logger.Errorf("Failed to open snapshot directory %s: %s", snapDir, err)
logger.Errorf("Failed to list snapshot files from %s: %s", snapDir, err)
return nil
}
defer dir.Close()

filenames, err := dir.Readdirnames(-1)
if err != nil {
logger.Errorf("Failed to read snapshot files: %s", err)
return nil
}

snapfiles := []string{}
for i := range filenames {
if strings.HasSuffix(filenames[i], ".snap") {
snapfiles = append(snapfiles, filenames[i])
}
}
sort.Strings(snapfiles)

var snapshots []uint64
for _, snapfile := range snapfiles {
Expand Down Expand Up @@ -242,15 +214,17 @@ func (rs *RaftStorage) Snapshot() raftpb.Snapshot {

// Store persists etcd/raft data
func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState, snapshot raftpb.Snapshot) error {
if err := rs.wal.Save(hardstate, entries); err != nil {
return err
}

if !raft.IsEmptySnap(snapshot) {
if err := rs.saveSnap(snapshot); err != nil {
return err
}
}

if err := rs.wal.Save(hardstate, entries); err != nil {
return err
}

if !raft.IsEmptySnap(snapshot) {
if err := rs.ram.ApplySnapshot(snapshot); err != nil {
if err == raft.ErrSnapOutOfDate {
rs.lg.Warnf("Attempted to apply out-of-date snapshot at Term %d and Index %d",
Expand Down Expand Up @@ -447,3 +421,54 @@ func (rs *RaftStorage) Close() error {

return nil
}

func loadNewestAvailableSnapshot(lg *flogging.FabricLogger, walDir, snapDir string) (*raftpb.Snapshot, *wal.WAL, raftpb.HardState, []raftpb.Entry, error) {
snapfiles, err := listSnapshotFiles(lg, snapDir)
if err != nil {
lg.Errorf("Failed to list snapshot files from %s: %s", snapDir, err)
}
for i := len(snapfiles) - 1; i >= 0; i-- {
snapshot, err := snap.Read(lg.Zap(), filepath.Join(snapDir, snapfiles[i]))
if err != nil {
lg.Warnf("Can not read snapshot from %s: %s", snapfiles[i], err)
continue
}
w, st, ents, err := createOrReadWAL(lg, walDir, snapshot)
if err != nil {
lg.Warnf("Create or read wal error: %s", err)
continue
}
if snapshot.Metadata.Index <= st.Commit {
return snapshot, w, st, ents, nil
}
if err := w.Close(); err != nil {
return nil, nil, raftpb.HardState{}, nil, err
}
}
lg.Warnf("Not available snapshot found in %s", snapDir)
w, st, ents, err := createOrReadWAL(lg, walDir, nil)
return nil, w, st, ents, err
}

func listSnapshotFiles(logging *flogging.FabricLogger, snapDir string) ([]string, error) {
dir, err := os.Open(snapDir)
if err != nil {
return nil, err
}
defer dir.Close()

filenames, err := dir.Readdirnames(-1)
if err != nil {
return nil, err
}

snapfiles := []string{}
for i := range filenames {
if strings.HasSuffix(filenames[i], ".snap") {
snapfiles = append(snapfiles, filenames[i])
}
}
sort.Strings(snapfiles)

return snapfiles, nil
}
10 changes: 5 additions & 5 deletions orderer/consensus/etcdraft/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestOpenWAL(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 10)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestTakeSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func TestTakeSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestTakeSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestApplyOutOfDateSnapshot(t *testing.T) {
for i := 0; i < 10; i++ {
store.Store(
[]raftpb.Entry{{Index: uint64(i), Data: make([]byte, 100)}},
raftpb.HardState{},
raftpb.HardState{Commit: uint64(i)},
raftpb.Snapshot{},
)
}
Expand Down

0 comments on commit d8be2fc

Please sign in to comment.