Added id field to all model's structs

Implemented YandexRequester summary method (TODO: test it)
Added BotData struct that will replace the Settings struct in the future
Implemented update methods for some structs' tables in BotData (TODO: implement update_requester and new methods)
Moved migrations to root directory
Note: use sqlx-cli from cargo to manage migrations from now on
Added *.db files to .gitignore
Added serde_json to Cargo.toml to parse json in YandexRequester
This commit is contained in:
Egor 2024-12-01 04:46:54 +03:00
parent 79f21425b1
commit 2069d9104c
9 changed files with 244 additions and 13 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
/target /target
.env .env
*.db

5
Cargo.lock generated
View file

@ -1805,9 +1805,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.132" version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377"
dependencies = [ dependencies = [
"itoa", "itoa",
"memchr", "memchr",
@ -2963,6 +2963,7 @@ dependencies = [
"pretty_env_logger", "pretty_env_logger",
"reqwest 0.12.9", "reqwest 0.12.9",
"serde", "serde",
"serde_json",
"sqlx", "sqlx",
"teloxide", "teloxide",
"thiserror 2.0.3", "thiserror 2.0.3",

View file

@ -14,3 +14,4 @@ dotenvy = "0.15.7"
reqwest = { version = "0.12.9", features = ["json"] } reqwest = { version = "0.12.9", features = ["json"] }
bitflags = "2.6.0" bitflags = "2.6.0"
sqlx = { version = "0.8.2", features = ["runtime-tokio", "tls-rustls", "sqlite"] } sqlx = { version = "0.8.2", features = ["runtime-tokio", "tls-rustls", "sqlite"] }
serde_json = "1.0.133"

View file

@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS users (
CREATE TABLE IF NOT EXISTS proxies ( CREATE TABLE IF NOT EXISTS proxies (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
ip INTEGER NOT NULL, ip BLOB NOT NULL,
port INTEGER NOT NULL, port INTEGER NOT NULL,
user TEXT NOT NULL, user TEXT NOT NULL,
password TEXT NOT NULL password TEXT NOT NULL
@ -17,5 +17,5 @@ CREATE TABLE IF NOT EXISTS requesters (
name TEXT NOT NULL, name TEXT NOT NULL,
api TEXT NOT NULL, api TEXT NOT NULL,
proxy_id INTEGER, proxy_id INTEGER,
FOREIGN_KEY (proxy_id) REFERENCES proxies (id) FOREIGN KEY (proxy_id) REFERENCES proxies (id)
); );

View file

@ -26,5 +26,5 @@ async fn main() -> ExitCode {
log::info!("starting telegram bot..."); log::info!("starting telegram bot...");
let bot = Bot::new(settings.get_bot_token()); let bot = Bot::new(settings.get_bot_token());
return ExitCode::SUCCESS; ExitCode::SUCCESS
} }

View file

@ -1,3 +1,159 @@
pub mod proxy; pub mod proxy;
pub mod requester; pub mod requester;
pub mod user; pub mod user;
use std::{collections::HashMap, net::IpAddr};
use proxy::Socks5Proxy;
use requester::YandexRequester;
use sqlx::SqlitePool;
use user::User;
pub struct BotData {
token: String,
// map username to id to get users faster by name
user_ids: HashMap<String, i32>,
users: HashMap<i32, User>,
proxies: HashMap<i32, Socks5Proxy>,
requesters: HashMap<i32, YandexRequester>,
pool: SqlitePool,
}
#[derive(thiserror::Error, Debug)]
pub enum BotError {
#[error("user with id {0} does not exist")]
UserNotFound(i32),
#[error("failed to update user with id {0}: {1}")]
UserUpdateFailed(i32, sqlx::Error),
#[error("failed to update user with id {0}: user exists in memory but not in database")]
UserNotFoundInDatabase(i32),
#[error("proxy with id {0} does not exist")]
ProxyNotFound(i32),
#[error("failed to update proxy with id {0}: {1}")]
ProxyUpdateFailed(i32, sqlx::Error),
#[error("failed to update proxy with id {0}: proxy exists in memory but not in database")]
ProxyNotFoundInDatabase(i32),
#[error("requester with id {0} does not exist")]
RequesterNotFound(i32),
#[error("failed to open db connection: {0}")]
ConnectionFailed(sqlx::Error),
}
impl BotData {
pub fn new(token: String, pool: SqlitePool) -> Self {
todo!()
}
pub fn get_token(&self) -> &str {
&self.token
}
pub fn get_user_by_name(&self, username: &str) -> Option<&User> {
let id = self.user_ids.get(username)?;
self.users.get(id)
}
pub fn get_user(&self, id: &i32) -> Option<&User> {
self.users.get(id)
}
pub async fn update_user<F>(&mut self, id: &i32, update_actions: F) -> Result<(), BotError>
where
F: FnOnce(&mut User),
{
let Some(user) = self.users.get_mut(id) else {
return Err(BotError::UserNotFound(*id));
};
// old copy of a user - used to rollback user changes in memory
// in case the database update fails
let old_user = user.clone();
update_actions(user);
// return if no changes are applied
if *user == old_user {
return Ok(());
}
let (username, permissions) = (user.get_username(), user.get_permissions().bits());
let query_result = sqlx::query!(
r#"
UPDATE users
SET username = ?1, permissions = ?2
WHERE id = ?3
"#,
username,
permissions,
id
)
.execute(&self.pool)
.await;
let Ok(query_result) = query_result else {
self.users.insert(*id, old_user);
return Err(BotError::UserUpdateFailed(*id, query_result.unwrap_err()));
};
if query_result.rows_affected() <= 0 {
self.users.insert(*id, old_user);
return Err(BotError::UserNotFoundInDatabase(*id));
}
Ok(())
}
pub fn get_proxy(&self, id: &i32) -> Option<&Socks5Proxy> {
self.proxies.get(id)
}
pub async fn update_proxy<F>(&mut self, id: &i32, update_actions: F) -> Result<(), BotError>
where
F: FnOnce(&mut Socks5Proxy),
{
let Some(proxy) = self.proxies.get_mut(id) else {
return Err(BotError::ProxyNotFound(*id));
};
let old_proxy = proxy.clone();
update_actions(proxy);
if *proxy == old_proxy {
return Ok(());
}
let ip = Self::ip_to_blob(proxy.get_ip());
let (port, user, password) = (proxy.get_port(), proxy.get_user(), proxy.get_password());
let query_result = sqlx::query!(
r#"
UPDATE proxies
SET ip = ?1, port = ?2, user = ?3, password = ?4
WHERE id = ?5
"#,
ip,
port,
user,
password,
id
)
.execute(&self.pool)
.await;
let Ok(query_result) = query_result else {
self.proxies.insert(*id, old_proxy);
return Err(BotError::ProxyUpdateFailed(*id, query_result.unwrap_err()));
};
if query_result.rows_affected() <= 0 {
self.proxies.insert(*id, old_proxy);
return Err(BotError::ProxyNotFoundInDatabase(*id));
}
Ok(())
}
pub fn get_requester(&self, id: &i32) -> Option<&YandexRequester> {
self.requesters.get(id)
}
pub async fn update_requester<F>(&mut self, id: &i32, update_actions: F) -> Result<(), BotError>
where
F: FnOnce(&mut YandexRequester),
{
todo!()
}
fn ip_to_blob(ip: IpAddr) -> Vec<u8> {
match ip {
IpAddr::V4(ipv4) => ipv4.octets().to_vec(),
IpAddr::V6(ipv6) => ipv6.octets().to_vec(),
}
}
}

View file

@ -2,6 +2,7 @@ use std::{fmt::Display, net::IpAddr};
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct Socks5Proxy { pub struct Socks5Proxy {
pub id: i32,
ip: IpAddr, ip: IpAddr,
port: u16, port: u16,
user: String, user: String,
@ -11,6 +12,7 @@ pub struct Socks5Proxy {
impl Socks5Proxy { impl Socks5Proxy {
pub fn new(ip: IpAddr, port: u16, user: String, password: String) -> Self { pub fn new(ip: IpAddr, port: u16, user: String, password: String) -> Self {
Self { Self {
id: 0,
ip, ip,
port, port,
user, user,

View file

@ -1,6 +1,6 @@
use reqwest::header::AUTHORIZATION;
use super::proxy::Socks5Proxy; use super::proxy::Socks5Proxy;
use reqwest::header::AUTHORIZATION;
use serde_json::{Map, Value};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Period { pub enum Period {
@ -12,6 +12,7 @@ pub enum Period {
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct YandexRequester { pub struct YandexRequester {
pub id: i32,
token: String, token: String,
name: String, name: String,
proxy: Option<Socks5Proxy>, proxy: Option<Socks5Proxy>,
@ -25,6 +26,8 @@ pub enum RequestError {
ClientBuildFailed(reqwest::Error), ClientBuildFailed(reqwest::Error),
#[error("get request failed: {0}")] #[error("get request failed: {0}")]
GetRequestFailed(reqwest::Error), GetRequestFailed(reqwest::Error),
#[error("response missing json field: {0}")]
MissingJSONField(&'static str),
} }
impl YandexRequester { impl YandexRequester {
@ -39,6 +42,7 @@ impl YandexRequester {
pub fn new_with_proxy(token: String, name: String, proxy: Socks5Proxy) -> Self { pub fn new_with_proxy(token: String, name: String, proxy: Socks5Proxy) -> Self {
Self { Self {
id: 0,
token, token,
name, name,
proxy: Some(proxy), proxy: Some(proxy),
@ -47,6 +51,7 @@ impl YandexRequester {
pub fn new(token: String, name: String) -> Self { pub fn new(token: String, name: String) -> Self {
Self { Self {
id: 0,
token, token,
name, name,
proxy: None, proxy: None,
@ -65,10 +70,11 @@ impl YandexRequester {
self.proxy.as_ref() self.proxy.as_ref()
} }
pub async fn summary(&self, period: Period) -> Result<(), RequestError> { pub async fn summary(&self, period: Period) -> Result<YandexResponse, RequestError> {
let period = Self::period_as_param(period); log::info!("requester '{}' attempts to request data...", self.name);
let mut builder = reqwest::Client::builder(); let mut builder = reqwest::Client::builder();
if let Some(proxy) = &self.proxy { if let Some(proxy) = &self.proxy {
log::info!("requester '{}' uses proxy '{}'", self.name, proxy.url());
let proxy = let proxy =
reqwest::Proxy::all(proxy.url()).map_err(|e| RequestError::InvalidProxy(e))?; reqwest::Proxy::all(proxy.url()).map_err(|e| RequestError::InvalidProxy(e))?;
builder = builder.proxy(proxy); builder = builder.proxy(proxy);
@ -77,15 +83,69 @@ impl YandexRequester {
.build() .build()
.map_err(|e| RequestError::ClientBuildFailed(e))?; .map_err(|e| RequestError::ClientBuildFailed(e))?;
// TODO: deserialize response to json and return something already // TODO: deserialize response to json and return something already
let response = client let json_data = client
.get(Self::YANDEX_PARTNER_URL) .get(Self::YANDEX_PARTNER_URL)
.header(AUTHORIZATION, self.token.clone()) .header(AUTHORIZATION, self.token.clone())
.query(&Self::YANDEX_PARTNER_DEFAULT_QUERY) .query(&Self::YANDEX_PARTNER_DEFAULT_QUERY)
.query(&("period", &period)) .query(&("period", &Self::period_as_param(period)))
.send() .send()
.await .await
.map_err(|e| RequestError::ClientBuildFailed(e))?; .map_err(|e| RequestError::GetRequestFailed(e))?
Ok(()) .json::<Map<String, Value>>()
.await
.map_err(|e| RequestError::GetRequestFailed(e))?;
log::info!(
"requester '{}' successfully got json response, trying to parse it...",
self.name
);
let response = Self::json_to_response(json_data)?;
log::info!("requester '{}' json successfully parsed", self.name);
Ok(response)
}
fn json_to_response(json: Map<String, Value>) -> Result<YandexResponse, RequestError> {
let fields = Self::get_json_relevant_fields(&json)?;
let resposne = YandexResponse {
shows: fields["shows"]
.as_u64()
.ok_or(RequestError::MissingJSONField("shows"))?,
hits_render: fields["hits_render"]
.as_u64()
.ok_or(RequestError::MissingJSONField("hits_render"))?,
partner_wo_nds: fields["partner_wo_nds"]
.as_f64()
.ok_or(RequestError::MissingJSONField("partner_wo_nds"))?,
cpmv_partner_wo_nds: fields["cpmv_partner_wo_nds"]
.as_f64()
.ok_or(RequestError::MissingJSONField("cpmv_partner_wo_nds"))?,
};
Ok(resposne)
}
fn get_json_relevant_fields(
json: &Map<String, Value>,
) -> Result<&Map<String, Value>, RequestError> {
let data = Self::get_field(json, "data")?;
let totals = Self::get_field(data, "totals")?;
let field2 = totals["2"]
.as_array()
.ok_or(RequestError::MissingJSONField("2"))?;
if field2.len() < 1 {
return Err(RequestError::MissingJSONField("array element 0"));
}
let fields = field2[0]
.as_object()
.ok_or(RequestError::MissingJSONField("array element 0"))?;
Ok(fields)
}
fn get_field<'a>(
json: &'a Map<String, Value>,
field: &'static str,
) -> Result<&'a Map<String, Value>, RequestError> {
json[field]
.as_object()
.ok_or(RequestError::MissingJSONField(field))
} }
fn period_as_param(period: Period) -> String { fn period_as_param(period: Period) -> String {
@ -98,3 +158,11 @@ impl YandexRequester {
.to_string() .to_string()
} }
} }
#[derive(Debug, Clone, PartialEq, Default)]
pub struct YandexResponse {
shows: u64,
hits_render: u64,
partner_wo_nds: f64,
cpmv_partner_wo_nds: f64,
}

View file

@ -22,6 +22,7 @@ impl Display for Permissions {
#[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)] #[derive(Debug, Clone, PartialEq, Eq, sqlx::FromRow)]
pub struct User { pub struct User {
pub id: i32,
username: String, username: String,
permissions: Permissions, permissions: Permissions,
} }
@ -29,6 +30,7 @@ pub struct User {
impl User { impl User {
pub fn new_with_perms(username: String, permissions: Permissions) -> Self { pub fn new_with_perms(username: String, permissions: Permissions) -> Self {
Self { Self {
id: 0,
username, username,
permissions, permissions,
} }