From 18822aeb7fa58aee3ef6b2c1bf343c4237410164 Mon Sep 17 00:00:00 2001 From: librelois Date: Tue, 18 May 2021 15:25:34 +0200 Subject: [PATCH] perf(indexer): store blocks chunks in files directly --- Cargo.lock | 15 +++- db/src/lib.rs | 1 - indexer/Cargo.toml | 3 +- indexer/src/blocks_chunks.rs | 139 +++++++++++++++++++++++------------ indexer/src/lib.rs | 13 ++-- src/lib.rs | 20 ++++- 6 files changed, 127 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 466db12..7646068 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1028,8 +1028,9 @@ dependencies = [ "dubp", "duniter-core", "duniter-gva-db", + "flate2", + "log", "maplit", - "miniz_oxide", "once_cell", "parking_lot", "resiter", @@ -1176,6 +1177,18 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "flate2" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd3aec53de10fe96d7d8c565eb17f2c687bb5518a2ec453b5b1252964526abe0" +dependencies = [ + "cfg-if 1.0.0", + "crc32fast", + "libc", + "miniz_oxide", +] + [[package]] name = "float-cmp" version = "0.8.0" diff --git a/db/src/lib.rs b/db/src/lib.rs index fca46a2..f6e899f 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -57,7 +57,6 @@ db_schema!( ["blocks_with_ud", BlocksWithUd, U32BE, ()], ["blockchain_time", BlockchainTime, U32BE, u64], ["blocks_chunk_hash", BlocksChunkHash, U32BE, HashDb], - ["compressed_blocks_chunk", CompressedBlocksChunk, U32BE, Vec], ["current_blocks_chunk", CurrentBlocksChunk, U32BE, GvaBlockDbV1], ["gva_identities", GvaIdentities, PubKeyKeyV2, GvaIdtyDbV1], [ diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 120c024..28f26ce 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -17,7 +17,8 @@ bincode = "1.3" duniter-core = { git = "https://git.duniter.org/nodes/rust/duniter-core" } duniter-gva-db = { path = "../db" } dubp = { version = "0.54.1", features = ["duniter"] } -miniz_oxide = "0.4.4" +flate2 = "1.0.16" +log = "0.4" once_cell = "1.7" resiter = "0.4.0" diff --git a/indexer/src/blocks_chunks.rs b/indexer/src/blocks_chunks.rs index f58ddfe..b5155fe 100644 --- a/indexer/src/blocks_chunks.rs +++ b/indexer/src/blocks_chunks.rs @@ -14,75 +14,116 @@ // along with this program. If not, see . use crate::*; +use flate2::read::ZlibDecoder; +use flate2::write::ZlibEncoder; +use flate2::Compression; const CHUNK_SIZE: u32 = 4_096; -pub(super) fn apply_block_blocks_chunk( +pub fn apply_block_blocks_chunk( block: &DubpBlockV10, - gva_db: &mut GvaV1DbTxRw, + gva_db: &GvaV1Db, + profile_path: &Path, ) -> KvResult<()> { let block_number = block.number().0; - gva_db.current_blocks_chunk.upsert( - U32BE(block_number), - GvaBlockDbV1(DubpBlock::V10(block.clone())), - ); + let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks"); + gva_db.write(|mut db| { + db.current_blocks_chunk.upsert( + U32BE(block_number), + GvaBlockDbV1(DubpBlock::V10(block.clone())), + ); - if (block_number + 1) % CHUNK_SIZE == 0 { - let current_chunk: Vec = gva_db - .current_blocks_chunk - .iter(.., |it| it.values().collect::, _>>())?; - let current_chunk_bin = bincode_db() - .serialize(¤t_chunk) - .map_err(|e| KvError::DeserError(e.into()))?; - let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref()); + if (block_number + 1) % CHUNK_SIZE == 0 { + let current_chunk: Vec = db + .current_blocks_chunk + .iter(.., |it| it.values().collect::, _>>())?; + let current_chunk_bin = bincode_db() + .serialize(¤t_chunk) + .map_err(|e| KvError::DeserError(e.into()))?; + let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref()); + let chunk_index = U32BE(block_number / CHUNK_SIZE); + db.blocks_chunk_hash.upsert(chunk_index, HashDb(chunk_hash)); - let compressed_chunk = miniz_oxide::deflate::compress_to_vec(current_chunk_bin.as_ref(), 3); + write_and_compress_chunk_in_file( + current_chunk_bin.as_ref(), + chunk_index.0, + chunks_folder_path.as_path(), + ) + .map_err(|e| KvError::Custom(e.into()))?; + } + Ok(()) + }) +} - let chunk_index = U32BE(block_number / CHUNK_SIZE); - gva_db - .blocks_chunk_hash - .upsert(chunk_index, HashDb(chunk_hash)); - gva_db - .compressed_blocks_chunk - .upsert(chunk_index, compressed_chunk); +/// Read and decompress bytes from file +fn read_and_remove_compressed_chunk( + chunk_index: u32, + chunks_folder_path: &Path, +) -> std::io::Result>> { + let file_path = chunks_folder_path.join(format!("_{}", chunk_index)); + if !file_path.exists() { + return Ok(None); } + if std::fs::metadata(file_path.as_path())?.len() > 0 { + let file = std::fs::File::open(file_path)?; + let mut z = ZlibDecoder::new(file); + let mut decompressed_bytes = Vec::new(); + z.read_to_end(&mut decompressed_bytes)?; + + Ok(Some(decompressed_bytes)) + } else { + Ok(None) + } +} + +/// Write and compress chunk in file +fn write_and_compress_chunk_in_file( + chunk: &[u8], + chunk_index: u32, + chunks_folder_path: &Path, +) -> Result<(), std::io::Error> { + log::info!("blocks_chunk_{}: {} bytes", chunk_index, chunk.len()); + let file = std::fs::File::create(chunks_folder_path.join(format!("_{}", chunk_index)))?; + let mut e = ZlibEncoder::new(file, Compression::new(3)); + e.write_all(chunk)?; + e.finish()?; Ok(()) } -pub(super) fn revert_block_blocks_chunk( +pub fn revert_block_blocks_chunk( block: &DubpBlockV10, - gva_db: &mut GvaV1DbTxRw, + gva_db: &GvaV1Db, + profile_path: &Path, ) -> KvResult<()> { let block_number = block.number().0; - if (block_number + 1) % CHUNK_SIZE == 0 { - // Uncompress las compressed chunk and replace it in current chunk - let chunk_index = U32BE(block_number / CHUNK_SIZE); - if let Some(compressed_chunk) = gva_db.compressed_blocks_chunk.get(&chunk_index)? { - gva_db.blocks_chunk_hash.remove(chunk_index); - gva_db.compressed_blocks_chunk.remove(chunk_index); + let chunks_folder_path = profile_path.join("data/gva_v1_blocks_chunks"); + gva_db.write(|mut db| { + if (block_number + 1) % CHUNK_SIZE == 0 { + // Uncompress last compressed chunk and replace it in current chunk + let chunk_index = U32BE(block_number / CHUNK_SIZE); + if let Some(current_chunk_bin) = + read_and_remove_compressed_chunk(chunk_index.0, chunks_folder_path.as_path())? + { + db.blocks_chunk_hash.remove(chunk_index); - let current_chunk_bin = - miniz_oxide::inflate::decompress_to_vec(compressed_chunk.as_ref()) - .map_err(|e| KvError::Custom(format!("{:?}", e).into()))?; - - let current_chunk: Vec = bincode_db() - .deserialize(current_chunk_bin.as_ref()) - .map_err(|e| KvError::DeserError(e.into()))?; - let current_chunk_begin = block_number - CHUNK_SIZE + 1; - for (i, block) in current_chunk.into_iter().enumerate() { - gva_db - .current_blocks_chunk - .upsert(U32BE(current_chunk_begin + i as u32), block); + let current_chunk: Vec = bincode_db() + .deserialize(current_chunk_bin.as_ref()) + .map_err(|e| KvError::DeserError(e.into()))?; + let current_chunk_begin = block_number - CHUNK_SIZE + 1; + for (i, block) in current_chunk.into_iter().enumerate() { + db.current_blocks_chunk + .upsert(U32BE(current_chunk_begin + i as u32), block); + } + } else { + return Err(KvError::DbCorrupted( + "Not found last compressed chunk".to_owned(), + )); } } else { - return Err(KvError::DbCorrupted( - "Not found last compressed chunk".to_owned(), - )); + db.current_blocks_chunk.remove(U32BE(block_number)); } - } else { - gva_db.current_blocks_chunk.remove(U32BE(block_number)); - } - Ok(()) + Ok(()) + }) } diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 3f03199..74f97dc 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -27,6 +27,8 @@ mod identities; mod tx; mod utxos; +pub use blocks_chunks::{apply_block_blocks_chunk, revert_block_blocks_chunk}; + use bincode::Options as _; use dubp::{ block::prelude::*, @@ -48,6 +50,7 @@ use once_cell::sync::OnceCell; use resiter::filter::Filter; use std::{ collections::{BTreeSet, HashMap, HashSet}, + io::prelude::*, ops::AddAssign, path::Path, }; @@ -118,7 +121,6 @@ pub fn apply_block( hash: block.hash(), }; gva_db.write(|mut db| { - blocks_chunks::apply_block_blocks_chunk::(block, &mut db)?; db.blocks_by_common_time .upsert(U64BE(block.common_time()), block.number().0); db.blockchain_time @@ -139,9 +141,7 @@ pub fn apply_block( block.common_time() as i64, block.transactions(), ) - })?; - - Ok(()) + }) } pub fn revert_block( @@ -150,7 +150,6 @@ pub fn revert_block( gva_db: &GvaV1Db, ) -> KvResult<()> { gva_db.write(|mut db| { - blocks_chunks::revert_block_blocks_chunk::(block, &mut db)?; db.blocks_by_common_time.remove(U64BE(block.common_time())); db.blockchain_time.remove(U32BE(block.number().0)); identities::revert_identities::(&block, &mut db.gva_identities)?; @@ -178,9 +177,7 @@ pub fn revert_block( } db.txs_by_block.remove(U32BE(block.number().0)); Ok(()) - })?; - - Ok(()) + }) } fn apply_ud( diff --git a/src/lib.rs b/src/lib.rs index c15d587..3d57c50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,8 +76,14 @@ impl duniter_core::module::DuniterModule for GvaModule { currency_params: CurrencyParameters, profile_path_opt: Option<&Path>, ) -> KvResult<()> { - let gva_db = get_gva_db_rw(profile_path_opt); - duniter_gva_indexer::apply_block(&block, currency_params, gva_db) + if let Some(profile_path) = profile_path_opt { + let gva_db = get_gva_db_rw(profile_path_opt); + duniter_gva_indexer::apply_block_blocks_chunk(&block, &gva_db, profile_path)?; + duniter_gva_indexer::apply_block(&block, currency_params, gva_db) + } else { + let gva_db = get_gva_db_rw(profile_path_opt); + duniter_gva_indexer::apply_block(&block, currency_params, gva_db) + } } fn revert_block( block: &DubpBlockV10, @@ -85,8 +91,14 @@ impl duniter_core::module::DuniterModule for GvaModule { currency_params: CurrencyParameters, profile_path_opt: Option<&Path>, ) -> KvResult<()> { - let gva_db = get_gva_db_rw(profile_path_opt); - duniter_gva_indexer::revert_block(&block, currency_params, gva_db) + if let Some(profile_path) = profile_path_opt { + let gva_db = get_gva_db_rw(profile_path_opt); + duniter_gva_indexer::revert_block_blocks_chunk(&block, &gva_db, profile_path)?; + duniter_gva_indexer::revert_block(&block, currency_params, gva_db) + } else { + let gva_db = get_gva_db_rw(profile_path_opt); + duniter_gva_indexer::revert_block(&block, currency_params, gva_db) + } } async fn init( conf: Self::Conf,