// Copyright (C) 2020 Éloïs req_id: (), resp_type: ()SANCHEZ. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . #![deny( clippy::unwrap_used, missing_copy_implementations, trivial_casts, trivial_numeric_casts, unstable_features, unused_import_braces )] mod exec_req_type; const MAX_BATCH_SIZE: usize = 10; const RESP_MIN_SIZE: usize = 64; type RespBytes = SmallVec<[u8; RESP_MIN_SIZE]>; use crate::exec_req_type::ExecReqTypeError; #[cfg(test)] use crate::tests::AsyncAccessor; use arrayvec::ArrayVec; use async_bincode::AsyncBincodeReader; use async_io_stream::IoStream; use bincode::Options as _; use dubp::crypto::keys::{ed25519::Ed25519KeyPair, Signator}; use duniter_bca_types::{ amount::Amount, bincode_opts, identity::Identity, BcaReq, BcaReqExecError, BcaReqTypeV0, BcaResp, BcaRespTypeV0, BcaRespV0, }; pub use duniter_core::dbs::kv_typed::prelude::*; use duniter_core::dbs::{FileBackend, SharedDbs}; #[cfg(not(test))] use duniter_core::global::AsyncAccessor; use duniter_gva_dbs_reader::DbsReader; use futures::{prelude::stream::FuturesUnordered, StreamExt, TryStream, TryStreamExt}; use once_cell::sync::OnceCell; use smallvec::SmallVec; use tokio::task::JoinError; #[cfg(test)] use crate::tests::DbsReaderImpl; #[cfg(not(test))] use duniter_gva_dbs_reader::DbsReaderImpl; static BCA_EXECUTOR: OnceCell = OnceCell::new(); pub fn set_bca_executor( currency: String, cm_accessor: AsyncAccessor, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler>, dbs_reader: DbsReaderImpl, self_keypair: Ed25519KeyPair, software_version: &'static str, txs_mempool: duniter_core::mempools::TxsMempool, ) { BCA_EXECUTOR .set(BcaExecutor { currency, cm_accessor, dbs_pool, dbs_reader, self_keypair, software_version, txs_mempool, }) .unwrap_or_else(|_| panic!("BCA_EXECUTOR already set !")) } #[cfg(not(test))] pub async fn execute(query_body_stream: S, is_whitelisted: bool) -> Vec where B: AsRef<[u8]>, S: 'static + TryStream + Send + Unpin, { unsafe { BCA_EXECUTOR .get_unchecked() .execute(query_body_stream, is_whitelisted) .await } } #[derive(Clone)] struct BcaExecutor { cm_accessor: AsyncAccessor, currency: String, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler>, dbs_reader: DbsReaderImpl, self_keypair: Ed25519KeyPair, software_version: &'static str, txs_mempool: duniter_core::mempools::TxsMempool, } use uninit::extension_traits::VecCapacity; impl BcaExecutor { pub async fn execute(&self, query_body_stream: S, is_whitelisted: bool) -> Vec where B: AsRef<[u8]>, S: 'static + TryStream + Send + Unpin, { let async_bincode_reader = AsyncBincodeReader::, BcaReq>::from(IoStream::new(query_body_stream)); self.execute_inner(async_bincode_reader, is_whitelisted) .await .into_iter() .fold(Vec::new(), |mut vec, elem| { // Write resp len let out = vec.reserve_uninit(4); out.copy_from_slice(&u32::to_be_bytes(elem.len() as u32)[..]); unsafe { // # Safety // // - `.copy_from_slice()` contract guarantees initialization // of `out`, which, in turn, from `reserve_uninit`'s contract, // leads to the `vec` extra capacity having been initialized. vec.set_len(vec.len() + 4); } // Write resp content let out = vec.reserve_uninit(elem.len()); out.copy_from_slice(&elem[..]); unsafe { // # Safety // // - `.copy_from_slice()` contract guarantees initialization // of `out`, which, in turn, from `reserve_uninit`'s contract, // leads to the `vec` extra capacity having been initialized. vec.set_len(vec.len() + elem.len()); } vec }) } async fn execute_inner( &self, stream: impl TryStream, is_whitelisted: bool, ) -> Vec { match stream .map_ok(|req| { let self_clone = self.clone(); tokio::spawn(async move { self_clone.execute_req(req, is_whitelisted).await }) }) .take(MAX_BATCH_SIZE) .try_collect::>() .await { Ok(futures_unordered) => { futures_unordered .map(|req_res: Result| { let resp = match req_res { Ok(resp) => Ok(resp), Err(e) => Err(if e.is_cancelled() { BcaReqExecError::Cancelled } else if e.is_panic() { BcaReqExecError::Panic } else { BcaReqExecError::Unknown }), }; let mut resp_buffer = RespBytes::new(); bincode_opts() .serialize_into(&mut resp_buffer, &resp) .expect("unreachable"); resp_buffer }) .collect() .await } Err(e) => { let req_res: Result = Err(BcaReqExecError::InvalidReq(e.to_string())); let mut resp_buffer = RespBytes::new(); bincode_opts() .serialize_into(&mut resp_buffer, &req_res) .expect("unreachable"); vec![resp_buffer] } } } #[inline(always)] async fn execute_req(self, req: BcaReq, is_whitelisted: bool) -> BcaResp { match req { BcaReq::V0(req) => BcaResp::V0(BcaRespV0 { req_id: req.req_id, resp_type: match crate::exec_req_type::execute_req_type( &self, req.req_type, is_whitelisted, ) .await { Ok(resp_type) => resp_type, Err(e) => BcaRespTypeV0::Error(e.0), }, }), _ => BcaResp::UnsupportedVersion, } } } #[cfg(not(test))] impl BcaExecutor { #[inline(always)] pub fn dbs_reader(&self) -> DbsReaderImpl { self.dbs_reader } } #[cfg(test)] mod tests { use super::*; pub use dubp::{ block::prelude::*, crypto::{ hashs::Hash, keys::{ed25519::PublicKey, KeyPair, Seed32}, }, documents::transaction::TransactionInputV10, wallet::prelude::*, }; pub use duniter_bca_types::BcaReqV0; pub use duniter_core::dbs::databases::bc_v2::{BcV2DbReadable, BcV2DbRo}; pub use duniter_core::dbs::databases::cm_v1::{CmV1Db, CmV1DbReadable}; pub use duniter_core::dbs::databases::txs_mp_v2::{TxsMpV2Db, TxsMpV2DbReadable}; pub use duniter_core::dbs::BlockMetaV2; pub use duniter_core::global::{CurrentMeta, MockAsyncAccessor}; pub use duniter_gva_dbs_reader::MockDbsReader; pub use futures::TryStreamExt; pub type AsyncAccessor = duniter_core::dbs::kv_typed::prelude::Arc; pub type DbsReaderImpl = duniter_core::dbs::kv_typed::prelude::Arc; impl BcaExecutor { #[inline(always)] pub fn dbs_reader(&self) -> DbsReaderImpl { self.dbs_reader.clone() } } pub(crate) fn create_bca_executor( mock_cm: MockAsyncAccessor, mock_dbs_reader: MockDbsReader, ) -> KvResult { let dbs = SharedDbs::mem()?; let threadpool = fast_threadpool::ThreadPool::start(fast_threadpool::ThreadPoolConfig::low(), dbs); Ok(BcaExecutor { cm_accessor: duniter_core::dbs::kv_typed::prelude::Arc::new(mock_cm), currency: "g1".to_owned(), dbs_pool: threadpool.into_async_handler(), dbs_reader: duniter_core::dbs::kv_typed::prelude::Arc::new(mock_dbs_reader), self_keypair: Ed25519KeyPair::from_seed( Seed32::random().expect("fail to gen random seed"), ), software_version: "test", txs_mempool: duniter_core::mempools::TxsMempool::new(10), }) } pub(crate) fn io_stream>( bytes: B, ) -> impl TryStream { futures::stream::iter(std::iter::once(Ok(bytes))) } #[tokio::test] async fn test_one_req_ok() -> Result<(), bincode::Error> { let req = BcaReq::V0(BcaReqV0 { req_id: 42, req_type: BcaReqTypeV0::MembersCount, }); assert_eq!(bincode_opts().serialized_size(&req)?, 3); let mut bytes = [0u8; 7]; bincode_opts().serialize_into(&mut bytes[4..], &req)?; bytes[3] = 3; use bincode::Options; //println!("bytes_for_bincode={:?}", &bytes[4..]); assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?); let mut mock_cm = MockAsyncAccessor::new(); mock_cm .expect_get_current_meta::() .times(1) .returning(|f| Some(f(&CurrentMeta::default()))); let bca_executor = create_bca_executor(mock_cm, MockDbsReader::new()) .expect("fail to create bca executor"); //println!("bytes={:?}", bytes); let bytes_res = bca_executor.execute(io_stream(bytes), false).await; //println!("bytes_res={:?}", bytes_res); let bca_res: Vec> = AsyncBincodeReader::<_, Result>::from(&bytes_res[..]) .try_collect::>() .await?; assert_eq!( bca_res, vec![Ok(BcaResp::V0(BcaRespV0 { req_id: 42, resp_type: BcaRespTypeV0::MembersCount(0) }))] ); Ok(()) } #[tokio::test] async fn test_one_req_invalid() -> Result<(), bincode::Error> { let req = BcaReq::V0(BcaReqV0 { req_id: 42, req_type: BcaReqTypeV0::MembersCount, }); assert_eq!(bincode_opts().serialized_size(&req)?, 3); let mut bytes = [0u8; 7]; bincode_opts().serialize_into(&mut bytes[4..], &req)?; bytes[3] = 2; use bincode::Options; //println!("bytes_for_bincode={:?}", &bytes[4..]); assert_eq!(req, bincode_opts().deserialize(&bytes[4..])?); let bca_executor = create_bca_executor(MockAsyncAccessor::new(), MockDbsReader::new()) .expect("fail to create bca executor"); //println!("bytes={:?}", bytes); let bytes_res = bca_executor.execute(io_stream(bytes), false).await; //println!("bytes_res={:?}", bytes_res); let bca_res: Vec> = AsyncBincodeReader::<_, Result>::from(&bytes_res[..]) .try_collect::>() .await?; assert_eq!( bca_res, vec![Err(BcaReqExecError::InvalidReq( "io error: unexpected end of file".to_owned() ))] ); Ok(()) } #[tokio::test] async fn test_two_reqs_ok() -> Result<(), bincode::Error> { let req1 = BcaReq::V0(BcaReqV0 { req_id: 42, req_type: BcaReqTypeV0::Ping, }); assert_eq!(bincode_opts().serialized_size(&req1)?, 3); let req2 = BcaReq::V0(BcaReqV0 { req_id: 57, req_type: BcaReqTypeV0::MembersCount, }); assert_eq!(bincode_opts().serialized_size(&req2)?, 3); let mut bytes = [0u8; 14]; bincode_opts().serialize_into(&mut bytes[4..], &req1)?; bytes[3] = 3; bincode_opts().serialize_into(&mut bytes[11..], &req2)?; bytes[10] = 3; let mut mock_cm = MockAsyncAccessor::new(); mock_cm .expect_get_current_meta::() .times(1) .returning(|f| Some(f(&CurrentMeta::default()))); let bca_executor = create_bca_executor(mock_cm, MockDbsReader::new()) .expect("fail to create bca executor"); //println!("bytes={:?}", bytes); let bytes_res = bca_executor.execute(io_stream(bytes), false).await; //println!("bytes_res={:?}", bytes_res); let bca_res: Vec> = AsyncBincodeReader::<_, Result>::from(&bytes_res[..]) .try_collect::>() .await?; assert_eq!( bca_res, vec![ Ok(BcaResp::V0(BcaRespV0 { req_id: 42, resp_type: BcaRespTypeV0::Pong })), Ok(BcaResp::V0(BcaRespV0 { req_id: 57, resp_type: BcaRespTypeV0::MembersCount(0) })) ] ); Ok(()) } }