From 2576d3e061ba521b731fa688d52056880477be2b Mon Sep 17 00:00:00 2001 From: librelois Date: Fri, 14 May 2021 16:55:05 +0200 Subject: [PATCH] feat(indexer): index blocks chunks --- Cargo.lock | 8 ++++ db/Cargo.toml | 2 +- indexer/Cargo.toml | 2 + indexer/src/blocks_chunks.rs | 79 ++++++++++++++++++++++++++++++++++++ indexer/src/lib.rs | 3 ++ 5 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 indexer/src/blocks_chunks.rs diff --git a/Cargo.lock b/Cargo.lock index 064de6c..b90a522 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,9 +987,11 @@ name = "duniter-gva-indexer" version = "0.1.0" dependencies = [ "anyhow", + "bincode", "dubp", "duniter-core", "duniter-gva-db", + "lz4_flex", "maplit", "once_cell", "resiter", @@ -1627,6 +1629,12 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "lz4_flex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05304f8e67dfc93d1b4b990137fd1a7a4c6ad44b60a9c486c8c4486f9d2027ae" + [[package]] name = "maplit" version = "1.0.2" diff --git a/db/Cargo.toml b/db/Cargo.toml index 191f051..04ceb64 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -11,7 +11,7 @@ edition = "2018" path = "src/lib.rs" [dependencies] -bincode = "1.2.1" +bincode = "1.3" chrono = { version = "0.4.15", optional = true } duniter-core = { git = "https://git.duniter.org/nodes/rust/duniter-core" } dubp = { version = "0.53.1", features = ["duniter"] } diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 71ebcd6..c49f07b 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -13,9 +13,11 @@ path = "src/lib.rs" [dependencies] anyhow = "1.0.34" +bincode = "1.3" duniter-core = { git = "https://git.duniter.org/nodes/rust/duniter-core" } duniter-gva-db = { path = "../db" } dubp = { version = "0.53.1", features = ["duniter"] } +lz4_flex = { version = "0.7", default-features = false } once_cell = "1.5.2" resiter = "0.4.0" diff --git a/indexer/src/blocks_chunks.rs b/indexer/src/blocks_chunks.rs new file mode 100644 index 0000000..aa8433e --- /dev/null +++ b/indexer/src/blocks_chunks.rs @@ -0,0 +1,79 @@ +// Copyright (C) 2020 Éloïs SANCHEZ. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use crate::*; + +pub(super) fn apply_block_blocks_chunk( + block: &DubpBlockV10, + gva_db: &mut GvaV1DbTxRw, +) -> KvResult<()> { + let block_number = block.number().0; + gva_db.current_blocks_chunk.upsert( + U32BE(block_number), + GvaBlockDbV1(DubpBlock::V10(block.clone())), + ); + + if block_number % 1_000 == 999 { + let current_chunk: Vec = gva_db + .current_blocks_chunk + .iter(.., |it| it.values().collect::, _>>())?; + let current_chunk_bin = + bincode::serialize(¤t_chunk).map_err(|e| KvError::DeserError(e.into()))?; + let chunk_hash = Hash::compute_blake3(current_chunk_bin.as_ref()); + let compressed_chunk = lz4_flex::compress_prepend_size(current_chunk_bin.as_ref()); + let chunk_index = U32BE(block_number / 1_000); + gva_db + .blocks_chunk_hash + .upsert(chunk_index, HashDb(chunk_hash)); + gva_db + .compressed_blocks_chunk + .upsert(chunk_index, compressed_chunk); + } + + Ok(()) +} + +pub(super) fn revert_block_blocks_chunk( + block: &DubpBlockV10, + gva_db: &mut GvaV1DbTxRw, +) -> KvResult<()> { + let block_number = block.number().0; + if block_number % 1_000 == 999 { + // Uncompress las compressed chunk and replace it in current chunk + let chunk_index = U32BE(block_number / 1_000); + if let Some(compressed_chunk) = gva_db.compressed_blocks_chunk.get(&chunk_index)? { + gva_db.blocks_chunk_hash.remove(chunk_index); + gva_db.compressed_blocks_chunk.remove(chunk_index); + let current_chunk_bin = lz4_flex::decompress_size_prepended(compressed_chunk.as_ref()) + .map_err(|e| KvError::Custom(format!("{:?}", e).into()))?; + let current_chunk: Vec = bincode::deserialize(current_chunk_bin.as_ref()) + .map_err(|e| KvError::DeserError(e.into()))?; + let current_chunk_begin = block_number - 999; + for (i, block) in current_chunk.into_iter().enumerate() { + gva_db + .current_blocks_chunk + .upsert(U32BE(current_chunk_begin + i as u32), block); + } + } else { + return Err(KvError::DbCorrupted( + "Not found last compressed chunk".to_owned(), + )); + } + } else { + gva_db.current_blocks_chunk.remove(U32BE(block_number)); + } + + Ok(()) +} diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 8578580..20cc798 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -22,6 +22,7 @@ unused_import_braces )] +mod blocks_chunks; mod identities; mod tx; mod utxos; @@ -75,6 +76,7 @@ pub fn apply_block(block: &DubpBlockV10, gva_db: &GvaV1Db) -> KvR hash: block.hash(), }; gva_db.write(|mut db| { + blocks_chunks::apply_block_blocks_chunk::(block, &mut db)?; db.blocks_by_common_time .upsert(U64BE(block.common_time()), block.number().0); db.blockchain_time @@ -102,6 +104,7 @@ pub fn apply_block(block: &DubpBlockV10, gva_db: &GvaV1Db) -> KvR pub fn revert_block(block: &DubpBlockV10, gva_db: &GvaV1Db) -> KvResult<()> { gva_db.write(|mut db| { + blocks_chunks::revert_block_blocks_chunk::(block, &mut db)?; db.blocks_by_common_time.remove(U64BE(block.common_time())); db.blockchain_time.remove(U32BE(block.number().0)); identities::revert_identities::(&block, &mut db.gva_identities)?;