Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: single-pass snapshot reading #1838

Open
wants to merge 123 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 118 commits
Commits
Show all changes
123 commits
Select commit Hold shift + click to select a range
ed3ab94
snapshot fragments for json work
segfault-magnet Mar 30, 2024
42e0763
parquet fragment support, cleanup pending
segfault-magnet Mar 30, 2024
58b04be
add tests for fragments
segfault-magnet Mar 30, 2024
f440bb9
comment out denies
segfault-magnet Mar 30, 2024
3fb1586
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Mar 31, 2024
4936ed9
snapshot generation uses concurrent workers
segfault-magnet Apr 1, 2024
3a0b898
task_manager used for import/export of snapshot
segfault-magnet Apr 1, 2024
316b1c9
cleanup imports
segfault-magnet Apr 1, 2024
35449e6
use rayon in genesis importer
segfault-magnet Apr 1, 2024
633dc22
move files around
segfault-magnet Apr 2, 2024
b0fdfdc
enable deny lints, fix errors in chain-config
segfault-magnet Apr 2, 2024
1f0b3c3
wip, investigating features
segfault-magnet Apr 2, 2024
1804a7d
feature gate imports, fix unused deps
segfault-magnet Apr 2, 2024
1209564
ci checks
segfault-magnet Apr 2, 2024
e01d204
remove unused result
segfault-magnet Apr 2, 2024
892849f
use rayon for exporter
segfault-magnet Apr 2, 2024
c8c28f7
remove uuid dep
segfault-magnet Apr 2, 2024
90785f4
dry up fragments tests
segfault-magnet Apr 2, 2024
58a4e25
deduplicate tests
segfault-magnet Apr 2, 2024
cb27956
inline path
segfault-magnet Apr 2, 2024
4171e87
dedupe writer tests
segfault-magnet Apr 2, 2024
9d09510
restructure into import/export format
segfault-magnet Apr 2, 2024
d84bd72
format and cargo sort
segfault-magnet Apr 2, 2024
ebe836b
update change log
segfault-magnet Apr 2, 2024
6060bad
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 2, 2024
47e5432
entries filter
segfault-magnet Apr 4, 2024
3c1ee01
optimize
segfault-magnet Apr 4, 2024
b9c3906
shorten bounds
segfault-magnet Apr 5, 2024
992bcc0
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 5, 2024
ec85160
can cancel/resume regenesis, pending progress info and e2e tests
segfault-magnet Apr 6, 2024
1d046dc
wip
segfault-magnet Apr 8, 2024
c9301c6
Merge branch 'master' into feature/regenesis_graceful_shutdown
segfault-magnet Apr 10, 2024
6a70cca
Merge remote-tracking branch 'origin/master' into feature/regenesis_g…
segfault-magnet Apr 11, 2024
bebc214
progress with cli/logs behavior
segfault-magnet Apr 11, 2024
4468ab1
make reader tolerate missing tables
segfault-magnet Apr 11, 2024
5c8654e
reenable denies
segfault-magnet Apr 11, 2024
8aea56d
remove index from group.
segfault-magnet Apr 11, 2024
a2c72d5
use state watcher instead of spawning a tokio task
segfault-magnet Apr 11, 2024
d21628c
cleanup
segfault-magnet Apr 11, 2024
00c3af9
improve rendering
segfault-magnet Apr 11, 2024
1d39fa0
grammar
segfault-magnet Apr 11, 2024
92b89d0
revert debugging stuff
segfault-magnet Apr 11, 2024
aacb2b3
update change log
segfault-magnet Apr 11, 2024
35746c7
fix state watcher default only being available with test-helpers
segfault-magnet Apr 11, 2024
8f0e375
pub use Groups
segfault-magnet Apr 11, 2024
2da160b
reformat
segfault-magnet Apr 11, 2024
5100fd2
add is empty to groups
segfault-magnet Apr 11, 2024
590c015
wip
segfault-magnet Apr 11, 2024
4b2f87f
allow warnings
segfault-magnet Apr 11, 2024
1b5b0aa
wip
segfault-magnet Apr 11, 2024
541ec4f
refactor since enumerations implement n-th method of Iterator
segfault-magnet Apr 11, 2024
e4e0f87
Merge remote-tracking branch 'origin/feature/regenesis_graceful_shutd…
segfault-magnet Apr 11, 2024
921d677
Merge branch 'master' into feature/regenesis_graceful_shutdown
xgreenx Apr 11, 2024
c06ac4c
Update CHANGELOG.md
segfault-magnet Apr 12, 2024
b368f97
wip
segfault-magnet Apr 12, 2024
d0d10c7
use group_num instead of index in order for the final report to be
segfault-magnet Apr 12, 2024
db58d61
Merge remote-tracking branch 'origin/feature/regenesis_graceful_shutd…
segfault-magnet Apr 12, 2024
5541a62
wip
segfault-magnet Apr 12, 2024
3bde4fc
move indicatiff down the cargo toml
segfault-magnet Apr 12, 2024
278cb99
PR comments
segfault-magnet Apr 12, 2024
f36b80e
add MultiCancellationToken to handle both state watcher and tokio cancel
segfault-magnet Apr 13, 2024
bf751cb
Merge remote-tracking branch 'origin/master' into feature/genesis_opt…
segfault-magnet Apr 16, 2024
4abad0f
Merge remote-tracking branch 'origin/master' into feature/snapshot_ge…
segfault-magnet Apr 16, 2024
8ea8092
propagate shutdown signal to the snapshot exporter
segfault-magnet Apr 16, 2024
e0fdb53
add logs to signal
segfault-magnet Apr 16, 2024
605e8c1
progress update of snapshot exporting
segfault-magnet Apr 17, 2024
39af4ff
Merge remote-tracking branch 'origin/master' into feature/snapshot_ge…
segfault-magnet Apr 17, 2024
aeb4193
uncomment denies
segfault-magnet Apr 17, 2024
9e8d6a6
remove unused imports
segfault-magnet Apr 17, 2024
ac415cf
remove set_max since rocksdb never gives out size hints for the
segfault-magnet Apr 17, 2024
3f60047
update changelog
segfault-magnet Apr 17, 2024
5648ee5
Merge remote-tracking branch 'origin/feature/snapshot_generation_grac…
segfault-magnet Apr 17, 2024
1ece5d8
support one of the db's reverting and being able to
segfault-magnet Apr 17, 2024
982911b
rename trait method
segfault-magnet Apr 17, 2024
a374a7a
bring back denies
segfault-magnet Apr 17, 2024
62448fd
use COW instead of cloning data
segfault-magnet Apr 17, 2024
8040fdf
Merge remote-tracking branch 'origin/master' into feature/snapshot_ge…
segfault-magnet Apr 18, 2024
87cd001
Merge remote-tracking branch 'origin/feature/snapshot_generation_grac…
segfault-magnet Apr 18, 2024
22b7ac2
rename on_chain
segfault-magnet Apr 18, 2024
21055ce
Merge remote-tracking branch 'origin/master' into feature/snapshot_ge…
segfault-magnet Apr 19, 2024
f97ad56
fix merge
segfault-magnet Apr 19, 2024
01fd369
Merge branch 'master' into feature/snapshot_generation_graceful_shutdown
xgreenx Apr 22, 2024
0d3fdc0
Merge remote-tracking branch 'origin/feature/snapshot_generation_grac…
segfault-magnet Apr 22, 2024
b658a72
bump time, CI slow
segfault-magnet Apr 22, 2024
307fbca
Merge remote-tracking branch 'origin/feature/snapshot_generation_grac…
segfault-magnet Apr 22, 2024
a9dac14
merge import handlers
segfault-magnet Apr 22, 2024
7b522a9
remove unused imports
segfault-magnet Apr 22, 2024
97412e0
uncomment denies
segfault-magnet Apr 22, 2024
f57c868
changelog
segfault-magnet Apr 22, 2024
896b747
Merge branch 'master' into feature/snapshot_generation_graceful_shutdown
segfault-magnet Apr 23, 2024
91432b2
move change log
segfault-magnet Apr 23, 2024
62bbe70
Update crates/fuel-core/src/service/genesis/progress.rs
segfault-magnet Apr 23, 2024
16d951a
Merge branch 'feature/snapshot_generation_graceful_shutdown' of githu…
segfault-magnet Apr 23, 2024
56aa555
typo
segfault-magnet Apr 23, 2024
7ebbe39
add rustdocs
segfault-magnet Apr 23, 2024
0999c28
suggestions
segfault-magnet Apr 23, 2024
21ff200
Update crates/fuel-core/src/p2p_test_helpers.rs
segfault-magnet Apr 23, 2024
7425da3
Merge branch 'feature/snapshot_generation_graceful_shutdown' of githu…
segfault-magnet Apr 23, 2024
9f99828
suggestions
segfault-magnet Apr 23, 2024
c1a9ee7
remove tokio rayon, implement suggestions
segfault-magnet Apr 23, 2024
c1ff3d0
fix unit tests
segfault-magnet Apr 23, 2024
1272062
inline when small number of groups
segfault-magnet Apr 23, 2024
ad96955
fix unit tests
segfault-magnet Apr 23, 2024
d7bd144
Merge remote-tracking branch 'origin/feature/snapshot_generation_grac…
segfault-magnet Apr 23, 2024
dbbf132
remove MultiCancellationToken in favor of a less general solution
segfault-magnet Apr 23, 2024
99ad6f8
Merge remote-tracking branch 'origin/feature/snapshot_generation_grac…
segfault-magnet Apr 23, 2024
d626d44
Merge remote-tracking branch 'origin/master' into feature/genesis_opt…
segfault-magnet Apr 24, 2024
270041a
revert files that should not have been changed
segfault-magnet Apr 24, 2024
f628c88
fix build
segfault-magnet Apr 24, 2024
06ec673
add check for unique table names
segfault-magnet Apr 24, 2024
5918150
readability
segfault-magnet Apr 24, 2024
13110bc
transactions not inserted in master
segfault-magnet Apr 24, 2024
a16b316
unneccessary changes
segfault-magnet Apr 24, 2024
b453447
nits
segfault-magnet Apr 24, 2024
3223270
add changelog
segfault-magnet Apr 24, 2024
3b0e7d4
Merge remote-tracking branch 'origin/master' into feature/genesis_opt…
segfault-magnet Apr 30, 2024
59879ea
FuelBlockIdsToHeights is never exported as a table but rather derived
segfault-magnet Apr 30, 2024
ec53cbf
clippy
segfault-magnet Apr 30, 2024
5787c06
Merge branch 'master' into feature/genesis_optimize_deriving
segfault-magnet May 3, 2024
47b241d
rename
segfault-magnet May 3, 2024
68d6371
Merge branch 'master' into feature/genesis_optimize_deriving
segfault-magnet May 6, 2024
6677b76
Merge branch 'master' into feature/genesis_optimize_deriving
Dentosal May 7, 2024
e7d5755
Merge branch 'master' into feature/genesis_optimize_deriving
xgreenx May 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- [#1838](https://github.com/FuelLabs/fuel-core/pull/1838): Snapshot parquet files are now read only once.

## [Version 0.26.0]

### Fixed
Expand Down
14 changes: 7 additions & 7 deletions benches/benches/block_target_gas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use fuel_core_chain_config::{
ContractConfig,
StateConfig,
};
use fuel_core_services::Service;
use fuel_core_storage::{
tables::ContractsRawCode,
vm_storage::IncreaseStorageKey,
Expand Down Expand Up @@ -336,12 +335,13 @@ fn service_with_many_contracts(
.unwrap();
}

let service = FuelService::new(
CombinedDatabase::new(database, Default::default(), Default::default()),
config.clone(),
)
.expect("Unable to start a FuelService");
service.start().expect("Unable to start the service");
// Genesis needs to be awaited because the benchmark requesest consensus parameters later on
let service = rt
.block_on(FuelService::from_combined_database(
CombinedDatabase::new(database, Default::default(), Default::default()),
config.clone(),
))
.expect("Unable to create service");
(service, rt)
}

Expand Down
8 changes: 8 additions & 0 deletions crates/chain-config/src/config/state/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ impl<T> Groups<T>
where
T: Mappable,
{
pub fn new(items: Vec<anyhow::Result<Vec<TableEntry<T>>>>) -> Self {
Self {
iter: GroupIter::InMemory {
groups: items.into_iter(),
},
}
}

pub fn len(&self) -> usize {
match &self.iter {
GroupIter::InMemory { groups } => groups.len(),
Expand Down
4 changes: 1 addition & 3 deletions crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use fuel_core_chain_config::{
use fuel_core_storage::{
blueprint::BlueprintInspect,
iter::IterDirection,
kv_store::StorageColumn,
structured_storage::TableWithBlueprint,
tables::{
Coins,
Expand Down Expand Up @@ -182,8 +181,7 @@ where
// TODO:
// [1857](https://github.com/FuelLabs/fuel-core/issues/1857)
// RocksDb can provide an estimate for the number of items.
let progress_tracker =
self.multi_progress.table_reporter(None, T::column().name());
let progress_tracker = self.multi_progress.table_reporter::<T>(None);
self.task_manager.spawn_blocking(move |cancel| {
db.entries::<T>(prefix, IterDirection::Forward)
.chunks(group_size)
Expand Down
173 changes: 55 additions & 118 deletions crates/fuel-core/src/service/genesis/importer.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
use self::import_task::ImportTable;

use super::{
progress::MultipleProgressReporter,
task_manager::TaskManager,
};
use crate::{
combined_database::CombinedGenesisDatabase,
database::database_description::{
off_chain::OffChain,
on_chain::OnChain,
},
fuel_core_graphql_api::storage::messages::SpentMessages,
graphql_api::storage::{
blocks::FuelBlockIdsToHeights,
coins::OwnedCoins,
contracts::ContractsInfo,
messages::OwnedMessageIds,
old::{
OldFuelBlockConsensus,
OldFuelBlocks,
Expand All @@ -25,7 +19,6 @@ use crate::{
},
},
};
use core::marker::PhantomData;
use fuel_core_chain_config::{
AsTable,
SnapshotReader,
Expand All @@ -34,7 +27,6 @@ use fuel_core_chain_config::{
};
use fuel_core_services::StateWatcher;
use fuel_core_storage::{
kv_store::StorageColumn,
structured_storage::TableWithBlueprint,
tables::{
Coins,
Expand All @@ -56,14 +48,10 @@ use fuel_core_types::{
},
fuel_types::BlockHeight,
};
use import_task::{
ImportTable,
ImportTask,
};
use itertools::Itertools;

mod import_task;
mod off_chain;
mod on_chain;
mod logic;

const GROUPS_NUMBER_FOR_PARALLELIZATION: usize = 10;

Expand Down Expand Up @@ -106,89 +94,51 @@ impl SnapshotImporter {

async fn run_workers(mut self) -> anyhow::Result<()> {
tracing::info!("Running imports");
self.spawn_worker_on_chain::<Coins>()?;
self.spawn_worker_on_chain::<Messages>()?;
self.spawn_worker_on_chain::<ContractsRawCode>()?;
self.spawn_worker_on_chain::<ContractsLatestUtxo>()?;
self.spawn_worker_on_chain::<ContractsState>()?;
self.spawn_worker_on_chain::<ContractsAssets>()?;
self.spawn_worker_on_chain::<ProcessedTransactions>()?;

self.spawn_worker_off_chain::<TransactionStatuses, TransactionStatuses>()?;
self.spawn_worker_off_chain::<OwnedTransactions, OwnedTransactions>()?;
self.spawn_worker_off_chain::<SpentMessages, SpentMessages>()?;
self.spawn_worker_off_chain::<Messages, OwnedMessageIds>()?;
self.spawn_worker_off_chain::<Coins, OwnedCoins>()?;
self.spawn_worker_off_chain::<FuelBlocks, OldFuelBlocks>()?;
self.spawn_worker_off_chain::<Transactions, OldTransactions>()?;
self.spawn_worker_off_chain::<SealedBlockConsensus, OldFuelBlockConsensus>()?;
self.spawn_worker_off_chain::<Transactions, ContractsInfo>()?;
self.spawn_worker_off_chain::<OldTransactions, ContractsInfo>()?;
self.spawn_worker_off_chain::<OldFuelBlocks, OldFuelBlocks>()?;
self.spawn_worker_off_chain::<OldFuelBlockConsensus, OldFuelBlockConsensus>()?;
self.spawn_worker_off_chain::<OldTransactions, OldTransactions>()?;
self.spawn_worker_off_chain::<FuelBlocks, FuelBlockIdsToHeights>()?;
self.spawn_worker_off_chain::<OldFuelBlocks, FuelBlockIdsToHeights>()?;

self.task_manager.wait().await?;

Ok(())
}

pub fn spawn_worker_on_chain<TableBeingWritten>(&mut self) -> anyhow::Result<()>
where
TableBeingWritten: TableWithBlueprint + 'static + Send,
TableEntry<TableBeingWritten>: serde::de::DeserializeOwned + Send,
StateConfig: AsTable<TableBeingWritten>,
Handler<TableBeingWritten, TableBeingWritten>:
ImportTable<TableInSnapshot = TableBeingWritten, DbDesc = OnChain>,
{
let groups = self.snapshot_reader.read::<TableBeingWritten>()?;
let num_groups = groups.len();

// Even though genesis is expected to last orders of magnitude longer than an empty task
// might take to execute, this optimization is placed regardless to speed up
// unit/integration tests that will feel the impact more than actual regenesis.
if num_groups == 0 {
return Ok(());
macro_rules! start_imports {
($($table:ty),*) => {
let names_unique = [
$(
fuel_core_storage::kv_store::StorageColumn::name(&<$table>::column()),
)*
].iter().all_unique();

if !names_unique {
panic!("Tables must have unique column names because they are used as keys to track the genesis progress both in on-chain and off-chain tables.");
}

$(self.start_import::<$table>()?;)*
};
}

let block_height = *self.genesis_block.header().height();
let da_block_height = self.genesis_block.header().da_height;
let db = self.db.on_chain().clone();

let migration_name = migration_name::<TableBeingWritten, TableBeingWritten>();
let progress_reporter = self
.multi_progress_reporter
.table_reporter(Some(num_groups), migration_name);

let task = ImportTask::new(
Handler::new(block_height, da_block_height),
groups,
db,
progress_reporter,
start_imports!(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we going to get any kind of compiler error/test failure if we add new tables in the future, or are we just going to need to remember to add them here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all tables get a snapshot file due to two reasons:

  • it is not migrated at all
  • or it is derived from an already present file

So if a table is to be migrated the developer should:

  1. export the data (if not derived)
  2. import it (by implementing ImportTable or changing the impl of an existing one in the case of derived tables).

Coins,
ContractsAssets,
ContractsLatestUtxo,
ContractsRawCode,
ContractsState,
FuelBlocks,
Messages,
OldFuelBlockConsensus,
OldFuelBlocks,
OldTransactions,
OwnedTransactions,
ProcessedTransactions,
SealedBlockConsensus,
SpentMessages,
TransactionStatuses,
Transactions
);

let import = |token| task.run(token);
if num_groups < GROUPS_NUMBER_FOR_PARALLELIZATION {
self.task_manager.run(import)?;
} else {
self.task_manager.spawn_blocking(import);
}
self.task_manager.wait().await?;

Ok(())
}

pub fn spawn_worker_off_chain<TableInSnapshot, TableBeingWritten>(
&mut self,
) -> anyhow::Result<()>
pub fn start_import<TableInSnapshot>(&mut self) -> anyhow::Result<()>
where
TableInSnapshot: TableWithBlueprint + Send + 'static,
TableInSnapshot: TableWithBlueprint + 'static + Send,
TableEntry<TableInSnapshot>: serde::de::DeserializeOwned + Send,
StateConfig: AsTable<TableInSnapshot>,
Handler<TableBeingWritten, TableInSnapshot>:
ImportTable<TableInSnapshot = TableInSnapshot, DbDesc = OffChain>,
TableBeingWritten: TableWithBlueprint + Send + 'static,
Handler: ImportTable<TableInSnapshot>,
{
let groups = self.snapshot_reader.read::<TableInSnapshot>()?;
let num_groups = groups.len();
Expand All @@ -203,20 +153,23 @@ impl SnapshotImporter {
let block_height = *self.genesis_block.header().height();
let da_block_height = self.genesis_block.header().da_height;

let db = self.db.off_chain().clone();
let on_chain_db = self.db.on_chain().clone();
let off_chain_db = self.db.off_chain().clone();

let migration_name = migration_name::<TableInSnapshot, TableBeingWritten>();
let progress_reporter = self
.multi_progress_reporter
.table_reporter(Some(num_groups), migration_name);

let task = ImportTask::new(
Handler::new(block_height, da_block_height),
groups,
db,
progress_reporter,
);
let import = |token| task.run(token);
.table_reporter::<TableInSnapshot>(Some(num_groups));

let import = move |token| {
import_task::import_entries(
token,
Handler::new(block_height, da_block_height),
groups,
on_chain_db,
off_chain_db,
progress_reporter,
)
};
if num_groups < GROUPS_NUMBER_FOR_PARALLELIZATION {
self.task_manager.run(import)?;
} else {
Expand All @@ -228,32 +181,16 @@ impl SnapshotImporter {
}

#[derive(Debug, Clone, Copy)]
pub struct Handler<TableBeingWritten, TableInSnapshot> {
pub struct Handler {
pub block_height: BlockHeight,
pub da_block_height: DaBlockHeight,
_table_being_written: PhantomData<TableBeingWritten>,
_table_in_snapshot: PhantomData<TableInSnapshot>,
}

impl<A, B> Handler<A, B> {
impl Handler {
pub fn new(block_height: BlockHeight, da_block_height: DaBlockHeight) -> Self {
Self {
block_height,
da_block_height,
_table_being_written: PhantomData,
_table_in_snapshot: PhantomData,
}
}
}

pub fn migration_name<TableInSnapshot, TableBeingWritten>() -> String
where
TableInSnapshot: TableWithBlueprint,
TableBeingWritten: TableWithBlueprint,
{
format!(
"{} -> {}",
TableInSnapshot::column().name(),
TableBeingWritten::column().name()
)
}