From 2069d9104c8ad3bf4ae84c76d9699f9f92593e93 Mon Sep 17 00:00:00 2001 From: erius Date: Sun, 1 Dec 2024 04:46:54 +0300 Subject: [PATCH] Added id field to all model's structs Implemented YandexRequester summary method (TODO: test it) Added BotData struct that will replace the Settings struct in the future Implemented update methods for some structs' tables in BotData (TODO: implement update_requester and new methods) Moved migrations to root directory Note: use sqlx-cli from cargo to manage migrations from now on Added *.db files to .gitignore Added serde_json to Cargo.toml to parse json in YandexRequester --- .gitignore | 1 + Cargo.lock | 5 +- Cargo.toml | 1 + .../20241201000918_schema.sql | 4 +- src/main.rs | 2 +- src/model.rs | 156 ++++++++++++++++++ src/model/proxy.rs | 2 + src/model/requester.rs | 84 +++++++++- src/model/user.rs | 2 + 9 files changed, 244 insertions(+), 13 deletions(-) rename db/migrations/schema.sql => migrations/20241201000918_schema.sql (84%) diff --git a/.gitignore b/.gitignore index fedaa2b..baca5ce 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target .env +*.db diff --git a/Cargo.lock b/Cargo.lock index 82f3944..73dad04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1805,9 +1805,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.132" +version = "1.0.133" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" +checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" dependencies = [ "itoa", "memchr", @@ -2963,6 +2963,7 @@ dependencies = [ "pretty_env_logger", "reqwest 0.12.9", "serde", + "serde_json", "sqlx", "teloxide", "thiserror 2.0.3", diff --git a/Cargo.toml b/Cargo.toml index d602cf5..bb0334d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ dotenvy = "0.15.7" reqwest = { version = "0.12.9", features = ["json"] } bitflags = "2.6.0" sqlx = { version = "0.8.2", features = ["runtime-tokio", "tls-rustls", "sqlite"] } +serde_json = "1.0.133" diff --git a/db/migrations/schema.sql b/migrations/20241201000918_schema.sql similarity index 84% rename from db/migrations/schema.sql rename to migrations/20241201000918_schema.sql index f45e702..3bd337a 100644 --- a/db/migrations/schema.sql +++ b/migrations/20241201000918_schema.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS users ( CREATE TABLE IF NOT EXISTS proxies ( id INTEGER PRIMARY KEY, - ip INTEGER NOT NULL, + ip BLOB NOT NULL, port INTEGER NOT NULL, user TEXT NOT NULL, password TEXT NOT NULL @@ -17,5 +17,5 @@ CREATE TABLE IF NOT EXISTS requesters ( name TEXT NOT NULL, api TEXT NOT NULL, proxy_id INTEGER, - FOREIGN_KEY (proxy_id) REFERENCES proxies (id) + FOREIGN KEY (proxy_id) REFERENCES proxies (id) ); diff --git a/src/main.rs b/src/main.rs index fb6e8da..f4d826f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,5 +26,5 @@ async fn main() -> ExitCode { log::info!("starting telegram bot..."); let bot = Bot::new(settings.get_bot_token()); - return ExitCode::SUCCESS; + ExitCode::SUCCESS } diff --git a/src/model.rs b/src/model.rs index 625feb6..7684ad4 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,3 +1,159 @@ pub mod proxy; pub mod requester; pub mod user; + +use std::{collections::HashMap, net::IpAddr}; + +use proxy::Socks5Proxy; +use requester::YandexRequester; +use sqlx::SqlitePool; +use user::User; + +pub struct BotData { + token: String, + // map username to id to get users faster by name + user_ids: HashMap, + users: HashMap, + proxies: HashMap, + requesters: HashMap, + pool: SqlitePool, +} + +#[derive(thiserror::Error, Debug)] +pub enum BotError { + #[error("user with id {0} does not exist")] + UserNotFound(i32), + #[error("failed to update user with id {0}: {1}")] + UserUpdateFailed(i32, sqlx::Error), + #[error("failed to update user with id {0}: user exists in memory but not in database")] + UserNotFoundInDatabase(i32), + #[error("proxy with id {0} does not exist")] + ProxyNotFound(i32), + #[error("failed to update proxy with id {0}: {1}")] + ProxyUpdateFailed(i32, sqlx::Error), + #[error("failed to update proxy with id {0}: proxy exists in memory but not in database")] + ProxyNotFoundInDatabase(i32), + #[error("requester with id {0} does not exist")] + RequesterNotFound(i32), + #[error("failed to open db connection: {0}")] + ConnectionFailed(sqlx::Error), +} + +impl BotData { + pub fn new(token: String, pool: SqlitePool) -> Self { + todo!() + } + + pub fn get_token(&self) -> &str { + &self.token + } + + pub fn get_user_by_name(&self, username: &str) -> Option<&User> { + let id = self.user_ids.get(username)?; + self.users.get(id) + } + + pub fn get_user(&self, id: &i32) -> Option<&User> { + self.users.get(id) + } + + pub async fn update_user(&mut self, id: &i32, update_actions: F) -> Result<(), BotError> + where + F: FnOnce(&mut User), + { + let Some(user) = self.users.get_mut(id) else { + return Err(BotError::UserNotFound(*id)); + }; + // old copy of a user - used to rollback user changes in memory + // in case the database update fails + let old_user = user.clone(); + update_actions(user); + // return if no changes are applied + if *user == old_user { + return Ok(()); + } + let (username, permissions) = (user.get_username(), user.get_permissions().bits()); + let query_result = sqlx::query!( + r#" +UPDATE users +SET username = ?1, permissions = ?2 +WHERE id = ?3 + "#, + username, + permissions, + id + ) + .execute(&self.pool) + .await; + let Ok(query_result) = query_result else { + self.users.insert(*id, old_user); + return Err(BotError::UserUpdateFailed(*id, query_result.unwrap_err())); + }; + if query_result.rows_affected() <= 0 { + self.users.insert(*id, old_user); + return Err(BotError::UserNotFoundInDatabase(*id)); + } + Ok(()) + } + + pub fn get_proxy(&self, id: &i32) -> Option<&Socks5Proxy> { + self.proxies.get(id) + } + + pub async fn update_proxy(&mut self, id: &i32, update_actions: F) -> Result<(), BotError> + where + F: FnOnce(&mut Socks5Proxy), + { + let Some(proxy) = self.proxies.get_mut(id) else { + return Err(BotError::ProxyNotFound(*id)); + }; + let old_proxy = proxy.clone(); + update_actions(proxy); + if *proxy == old_proxy { + return Ok(()); + } + let ip = Self::ip_to_blob(proxy.get_ip()); + let (port, user, password) = (proxy.get_port(), proxy.get_user(), proxy.get_password()); + let query_result = sqlx::query!( + r#" +UPDATE proxies +SET ip = ?1, port = ?2, user = ?3, password = ?4 +WHERE id = ?5 + "#, + ip, + port, + user, + password, + id + ) + .execute(&self.pool) + .await; + let Ok(query_result) = query_result else { + self.proxies.insert(*id, old_proxy); + return Err(BotError::ProxyUpdateFailed(*id, query_result.unwrap_err())); + }; + if query_result.rows_affected() <= 0 { + self.proxies.insert(*id, old_proxy); + return Err(BotError::ProxyNotFoundInDatabase(*id)); + } + Ok(()) + } + + pub fn get_requester(&self, id: &i32) -> Option<&YandexRequester> { + self.requesters.get(id) + } + + pub async fn update_requester(&mut self, id: &i32, update_actions: F) -> Result<(), BotError> + where + F: FnOnce(&mut YandexRequester), + { + todo!() + } + + fn ip_to_blob(ip: IpAddr) -> Vec { + match ip { + IpAddr::V4(ipv4) => ipv4.octets().to_vec(), + IpAddr::V6(ipv6) => ipv6.octets().to_vec(), + } + } +} diff --git a/src/model/proxy.rs b/src/model/proxy.rs index 99f66c9..4906d2f 100644 --- a/src/model/proxy.rs +++ b/src/model/proxy.rs @@ -2,6 +2,7 @@ use std::{fmt::Display, net::IpAddr}; #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] pub struct Socks5Proxy { + pub id: i32, ip: IpAddr, port: u16, user: String, @@ -11,6 +12,7 @@ pub struct Socks5Proxy { impl Socks5Proxy { pub fn new(ip: IpAddr, port: u16, user: String, password: String) -> Self { Self { + id: 0, ip, port, user, diff --git a/src/model/requester.rs b/src/model/requester.rs index ac18e8a..23cad9e 100644 --- a/src/model/requester.rs +++ b/src/model/requester.rs @@ -1,6 +1,6 @@ -use reqwest::header::AUTHORIZATION; - use super::proxy::Socks5Proxy; +use reqwest::header::AUTHORIZATION; +use serde_json::{Map, Value}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Period { @@ -12,6 +12,7 @@ pub enum Period { #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] pub struct YandexRequester { + pub id: i32, token: String, name: String, proxy: Option, @@ -25,6 +26,8 @@ pub enum RequestError { ClientBuildFailed(reqwest::Error), #[error("get request failed: {0}")] GetRequestFailed(reqwest::Error), + #[error("response missing json field: {0}")] + MissingJSONField(&'static str), } impl YandexRequester { @@ -39,6 +42,7 @@ impl YandexRequester { pub fn new_with_proxy(token: String, name: String, proxy: Socks5Proxy) -> Self { Self { + id: 0, token, name, proxy: Some(proxy), @@ -47,6 +51,7 @@ impl YandexRequester { pub fn new(token: String, name: String) -> Self { Self { + id: 0, token, name, proxy: None, @@ -65,10 +70,11 @@ impl YandexRequester { self.proxy.as_ref() } - pub async fn summary(&self, period: Period) -> Result<(), RequestError> { - let period = Self::period_as_param(period); + pub async fn summary(&self, period: Period) -> Result { + log::info!("requester '{}' attempts to request data...", self.name); let mut builder = reqwest::Client::builder(); if let Some(proxy) = &self.proxy { + log::info!("requester '{}' uses proxy '{}'", self.name, proxy.url()); let proxy = reqwest::Proxy::all(proxy.url()).map_err(|e| RequestError::InvalidProxy(e))?; builder = builder.proxy(proxy); @@ -77,15 +83,69 @@ impl YandexRequester { .build() .map_err(|e| RequestError::ClientBuildFailed(e))?; // TODO: deserialize response to json and return something already - let response = client + let json_data = client .get(Self::YANDEX_PARTNER_URL) .header(AUTHORIZATION, self.token.clone()) .query(&Self::YANDEX_PARTNER_DEFAULT_QUERY) - .query(&("period", &period)) + .query(&("period", &Self::period_as_param(period))) .send() .await - .map_err(|e| RequestError::ClientBuildFailed(e))?; - Ok(()) + .map_err(|e| RequestError::GetRequestFailed(e))? + .json::>() + .await + .map_err(|e| RequestError::GetRequestFailed(e))?; + log::info!( + "requester '{}' successfully got json response, trying to parse it...", + self.name + ); + let response = Self::json_to_response(json_data)?; + log::info!("requester '{}' json successfully parsed", self.name); + Ok(response) + } + + fn json_to_response(json: Map) -> Result { + let fields = Self::get_json_relevant_fields(&json)?; + let resposne = YandexResponse { + shows: fields["shows"] + .as_u64() + .ok_or(RequestError::MissingJSONField("shows"))?, + hits_render: fields["hits_render"] + .as_u64() + .ok_or(RequestError::MissingJSONField("hits_render"))?, + partner_wo_nds: fields["partner_wo_nds"] + .as_f64() + .ok_or(RequestError::MissingJSONField("partner_wo_nds"))?, + cpmv_partner_wo_nds: fields["cpmv_partner_wo_nds"] + .as_f64() + .ok_or(RequestError::MissingJSONField("cpmv_partner_wo_nds"))?, + }; + Ok(resposne) + } + + fn get_json_relevant_fields( + json: &Map, + ) -> Result<&Map, RequestError> { + let data = Self::get_field(json, "data")?; + let totals = Self::get_field(data, "totals")?; + let field2 = totals["2"] + .as_array() + .ok_or(RequestError::MissingJSONField("2"))?; + if field2.len() < 1 { + return Err(RequestError::MissingJSONField("array element 0")); + } + let fields = field2[0] + .as_object() + .ok_or(RequestError::MissingJSONField("array element 0"))?; + Ok(fields) + } + + fn get_field<'a>( + json: &'a Map, + field: &'static str, + ) -> Result<&'a Map, RequestError> { + json[field] + .as_object() + .ok_or(RequestError::MissingJSONField(field)) } fn period_as_param(period: Period) -> String { @@ -98,3 +158,11 @@ impl YandexRequester { .to_string() } } + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct YandexResponse { + shows: u64, + hits_render: u64, + partner_wo_nds: f64, + cpmv_partner_wo_nds: f64, +} diff --git a/src/model/user.rs b/src/model/user.rs index 1fb069f..a34dfed 100644 --- a/src/model/user.rs +++ b/src/model/user.rs @@ -22,6 +22,7 @@ impl Display for Permissions { #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] pub struct User { + pub id: i32, username: String, permissions: Permissions, } @@ -29,6 +30,7 @@ pub struct User { impl User { pub fn new_with_perms(username: String, permissions: Permissions) -> Self { Self { + id: 0, username, permissions, }