// Copyright (C) 2020 Éloïs SANCHEZ. // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as // published by the Free Software Foundation, either version 3 of the // License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . #![deny( clippy::unwrap_used, missing_copy_implementations, trivial_casts, trivial_numeric_casts, unstable_features, unused_import_braces )] mod anti_spam; mod warp_; use async_graphql::http::GraphQLPlaygroundConfig; use duniter_core::common::{currency_params::CurrencyParameters, prelude::*}; use duniter_core::dbs::databases::txs_mp_v2::TxsMpV2DbReadable; use duniter_core::dbs::prelude::*; use duniter_core::dbs::{kv_typed::prelude::*, FileBackend}; use duniter_core::documents::transaction::TransactionDocumentV10; use duniter_core::global::AsyncAccessor; use duniter_core::mempools::Mempools; #[cfg(not(test))] use duniter_core::module::public_ips::get_public_ips; use duniter_core::{block::DubpBlockV10, crypto::hashs::Hash}; use duniter_core::{ common::crypto::keys::{ed25519::PublicKey, KeyPair as _}, crypto::keys::ed25519::Ed25519KeyPair, }; use duniter_core::{conf::DuniterMode, module::Endpoint}; use duniter_gva_conf::GvaConf; use duniter_gva_db::*; use duniter_gva_gql::{GvaSchema, QueryContext}; use duniter_gva_indexer::{get_gva_db_ro, get_gva_db_rw}; use futures::{StreamExt, TryStreamExt}; use std::{convert::Infallible, path::Path}; #[cfg(test)] use tests::get_public_ips; use warp::{http::Response as HttpResponse, Filter as _, Rejection}; const PLAYGROUND_SUB_PATH: &str = "playground"; const SUBSCRIPTION_SUB_PATH: &str = "subscription"; #[derive(Debug)] pub struct GvaModule { conf: GvaConf, currency: String, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler>, gva_db_ro: &'static GvaV1DbRo, mempools: Mempools, mode: DuniterMode, self_keypair: Ed25519KeyPair, software_version: &'static str, } #[async_trait::async_trait] impl duniter_core::module::DuniterModule for GvaModule { const INDEX_BLOCKS: bool = true; const MODULE_NAME: &'static str = "gva"; type Conf = GvaConf; fn apply_block( block: &DubpBlockV10, _conf: &duniter_core::conf::DuniterCoreConf, currency_params: CurrencyParameters, profile_path_opt: Option<&Path>, ) -> KvResult<()> { if let Some(profile_path) = profile_path_opt { let gva_db = get_gva_db_rw(profile_path_opt); duniter_gva_indexer::apply_block_blocks_chunk(&block, &gva_db, profile_path)?; duniter_gva_indexer::apply_block(&block, currency_params, gva_db) } else { let gva_db = get_gva_db_rw(profile_path_opt); duniter_gva_indexer::apply_block(&block, currency_params, gva_db) } } fn revert_block( block: &DubpBlockV10, _conf: &duniter_core::conf::DuniterCoreConf, currency_params: CurrencyParameters, profile_path_opt: Option<&Path>, ) -> KvResult<()> { if let Some(profile_path) = profile_path_opt { let gva_db = get_gva_db_rw(profile_path_opt); duniter_gva_indexer::revert_block_blocks_chunk(&block, &gva_db, profile_path)?; duniter_gva_indexer::revert_block(&block, currency_params, gva_db) } else { let gva_db = get_gva_db_rw(profile_path_opt); duniter_gva_indexer::revert_block(&block, currency_params, gva_db) } } async fn init( conf: Self::Conf, core_conf: &duniter_core::conf::DuniterCoreConf, currency: &str, dbs_pool: &fast_threadpool::ThreadPoolAsyncHandler>, mempools: Mempools, mode: duniter_core::conf::DuniterMode, profile_path_opt: Option<&Path>, software_version: &'static str, ) -> anyhow::Result<(Self, Vec)> { let self_keypair = core_conf.self_key_pair.clone(); let endpoints = Self::gen_endpoints(&conf).await?; Ok(( GvaModule { conf, currency: currency.to_owned(), dbs_pool: dbs_pool.to_owned(), gva_db_ro: get_gva_db_ro(profile_path_opt), mempools, mode, self_keypair, software_version, }, endpoints, )) } async fn start(self) -> anyhow::Result<()> { // Do not start GVA server on js tests if std::env::var_os("DUNITER_JS_TESTS") != Some("yes".into()) { let GvaModule { conf, currency, dbs_pool, gva_db_ro, mempools, mode, self_keypair, software_version, } = self; if let DuniterMode::Start = mode { if conf.enabled { GvaModule::start_inner( conf, currency, dbs_pool, gva_db_ro, mempools, self_keypair, software_version, ) .await } } } Ok(()) } // Needed for BMA only, will be removed when the migration is complete. fn get_transactions_history_for_bma( dbs_pool: &fast_threadpool::ThreadPoolSyncHandler>, profile_path_opt: Option<&Path>, pubkey: PublicKey, ) -> KvResult> { let gva_db = get_gva_db_ro(profile_path_opt); let duniter_gva_dbs_reader::txs_history::TxsHistory { sent, received, sending, pending, } = dbs_pool .execute(move |dbs| { duniter_gva_dbs_reader::txs_history::get_transactions_history_for_bma( gva_db, &dbs.txs_mp_db, pubkey, ) }) .expect("dbs pool disconnected")?; Ok(Some(duniter_core::module::TxsHistoryForBma { sent: sent .into_iter() .map( |GvaTxDbV1 { tx, written_block, written_time, }| (tx, written_block, written_time), ) .collect(), received: received .into_iter() .map( |GvaTxDbV1 { tx, written_block, written_time, }| (tx, written_block, written_time), ) .collect(), sending, pending, })) } // Needed for BMA only, will be removed when the migration is complete. fn get_tx_by_hash( dbs_pool: &fast_threadpool::ThreadPoolSyncHandler>, hash: Hash, profile_path_opt: Option<&Path>, ) -> KvResult)>> { let gva_db = get_gva_db_ro(profile_path_opt); dbs_pool .execute(move |dbs| { if let Some(tx) = dbs .txs_mp_db .txs() .get(&duniter_core::dbs::HashKeyV2(hash))? { Ok(Some((tx.doc, None))) } else if let Some(tx_db) = gva_db.txs().get(&duniter_core::dbs::HashKeyV2(hash))? { Ok(Some((tx_db.tx, Some(tx_db.written_block.number)))) } else { Ok(None) } }) .expect("dbs pool disconnected") } } impl GvaModule { async fn start_inner( conf: GvaConf, currency: String, dbs_pool: fast_threadpool::ThreadPoolAsyncHandler>, gva_db_ro: &'static GvaV1DbRo, mempools: Mempools, self_keypair: Ed25519KeyPair, software_version: &'static str, ) { log::info!("GvaServer::start: conf={:?}", conf); // Create BdaExecutor and GvaSchema let self_pubkey = self_keypair.public_key(); duniter_bda::set_bda_executor( currency.clone(), AsyncAccessor::new(), dbs_pool.clone(), duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), self_keypair, software_version, mempools.txs, ); let gva_schema = duniter_gva_gql::build_schema_with_data( duniter_gva_gql::GvaSchemaData { cm_accessor: AsyncAccessor::new(), dbs_reader: duniter_gva_dbs_reader::create_dbs_reader(gva_db_ro), dbs_pool, server_meta_data: duniter_gva_gql::ServerMetaData { currency, self_pubkey, software_version, }, txs_mempool: mempools.txs, }, true, ); // Create warp server routes let gva_route = warp_::gva_route( &conf, gva_schema.clone(), async_graphql::http::MultipartOptions::default(), ); let gva_playground_route = warp_::gva_playground_route(&conf); let gva_subscription_route = warp_::gva_subscription_route(&conf, gva_schema.clone()); // Define recover function let recover_func = |err: Rejection| async move { if let Some(warp_::BadRequest(err)) = err.find() { return Ok::<_, Infallible>(warp::reply::with_status( err.to_string(), http::StatusCode::BAD_REQUEST, )); } Ok(warp::reply::with_status( "INTERNAL_SERVER_ERROR".to_string(), http::StatusCode::INTERNAL_SERVER_ERROR, )) }; // Start warp server if conf.playground { Self::run_warp_server( &conf, gva_route .or(gva_subscription_route) .or(gva_playground_route) .recover(recover_func), ) .await } else { Self::run_warp_server( &conf, gva_route.or(gva_subscription_route).recover(recover_func), ) .await } } async fn run_warp_server(conf: &GvaConf, routes: F) where F: warp::Filter + Clone + Send + Sync + 'static, F::Extract: warp::Reply, { log::info!( "GVA server listen on http://{}:{}/{}", conf.ip4, conf.port, &conf.path ); if let Some(ip6) = conf.ip6 { log::info!( "GVA server listen on http://[{}]:{}/{}", ip6, conf.port, &conf.path ); futures::future::join( warp::serve(routes.clone()).run((conf.ip4, conf.port)), warp::serve(routes).run((ip6, conf.port)), ) .await; } else { warp::serve(routes).run((conf.ip4, conf.port)).await; } log::warn!("GVA server stopped"); } async fn gen_endpoints(conf: &GvaConf) -> anyhow::Result> { let mut endpoints = Vec::new(); if conf.enabled { // Fill remote hosh let remote_hosh = if let Some(remote_host) = conf.remote_host.clone() { remote_host } else { let public_ips = get_public_ips().await; let mut remote_hosh = String::new(); if let Some(ip4) = public_ips.public_ip4_opt { remote_hosh.push_str(ip4.to_string().as_str()); remote_hosh.push(' '); } if let Some(ip6) = public_ips.public_ip6_opt { remote_hosh.push_str(&format!("[{}]", ip6.to_string())); } else if !remote_hosh.is_empty() { remote_hosh.pop(); } else { return Err(anyhow::Error::msg( "Fail to found public IPs, please configure remote_host manually", )); } remote_hosh }; // Fill remote port let remote_port = conf.get_remote_port(); // Push endpoints endpoints.push(format!( "GVA {}{} {} {}", if remote_port == 443 || conf.remote_tls.unwrap_or_default() { "S " } else { "" }, remote_hosh, remote_port, conf.get_remote_path(), )); } Ok(endpoints) } } #[cfg(test)] mod tests { use super::*; use duniter_core::mempools::Mempools; use duniter_core::module::DuniterModule; use duniter_core::{conf::DuniterCoreConf, module::public_ips::PublicIPs}; use fast_threadpool::{ThreadPool, ThreadPoolConfig}; use std::net::{Ipv4Addr, Ipv6Addr}; use unwrap::unwrap; static PUBLIC_IPS_MOCK: async_mutex::Mutex> = async_mutex::Mutex::new(None); pub async fn get_public_ips() -> PublicIPs { let public_ips = *PUBLIC_IPS_MOCK.lock().await; public_ips.unwrap_or(PublicIPs { public_ip4_opt: None, public_ip6_opt: None, }) } async fn test_gen_endpoints( conf: &GvaConf, public_ips: PublicIPs, ) -> anyhow::Result> { PUBLIC_IPS_MOCK.lock().await.replace(public_ips); GvaModule::gen_endpoints(&conf).await } #[tokio::test] async fn gen_endpoints() -> anyhow::Result<()> { let conf = GvaConf { enabled: true, ..Default::default() }; // ip4 and ip6 find let endpoints = test_gen_endpoints( &conf, PublicIPs { public_ip4_opt: Some(Ipv4Addr::UNSPECIFIED), public_ip6_opt: Some(Ipv6Addr::UNSPECIFIED), }, ) .await?; assert_eq!(endpoints, vec!["GVA 0.0.0.0 [::] 30901 gva".to_owned(),]); // only ip4 find let endpoints = test_gen_endpoints( &conf, PublicIPs { public_ip4_opt: Some(Ipv4Addr::UNSPECIFIED), public_ip6_opt: None, }, ) .await?; assert_eq!(endpoints, vec!["GVA 0.0.0.0 30901 gva".to_owned(),]); // only ip6 find let endpoints = test_gen_endpoints( &conf, PublicIPs { public_ip4_opt: None, public_ip6_opt: Some(Ipv6Addr::UNSPECIFIED), }, ) .await?; assert_eq!(endpoints, vec!["GVA [::] 30901 gva".to_owned(),]); // No ips find assert!(test_gen_endpoints( &conf, PublicIPs { public_ip4_opt: None, public_ip6_opt: None, }, ) .await .is_err()); Ok(()) } #[tokio::test] #[ignore] async fn launch_mem_gva() -> anyhow::Result<()> { let dbs = unwrap!(SharedDbs::mem()); let threadpool = ThreadPool::start(ThreadPoolConfig::default(), dbs); GvaModule::init( GvaConf::default(), &DuniterCoreConf::default(), "", &threadpool.into_async_handler(), Mempools::default(), duniter_core::conf::DuniterMode::Start, None, "test", ) .await? .0 .start() .await?; Ok(()) } }