From 82c1a96a060e18bce2957d0beb82c74ba5173dec Mon Sep 17 00:00:00 2001 From: Maddox Werts Date: Tue, 11 Feb 2025 12:03:04 -0500 Subject: [PATCH] Imported Updater --- project/src/updater.rs | 187 ++++++++++++++++++++++++++++++ project/src/updater/puller.rs | 208 ++++++++++++++++++++++++++++++++++ 2 files changed, 395 insertions(+) create mode 100644 project/src/updater.rs create mode 100644 project/src/updater/puller.rs diff --git a/project/src/updater.rs b/project/src/updater.rs new file mode 100644 index 0000000..c63a518 --- /dev/null +++ b/project/src/updater.rs @@ -0,0 +1,187 @@ +// Libraries +use std::error::Error; +use std::sync::{Arc, Mutex}; +use chrono::{self, NaiveDateTime, Utc}; + +mod puller; + +use crate::base::database::Database; +use crate::base::conf::Conf; +use puller::Puller; + +// Structures +pub struct Updater { + database: Arc>, + puller: Puller +} + +// Implementations +impl Updater { + // Constructors + pub fn init(conf: &Conf, database: Arc>) -> Result> { + // Getting the Market API + let binding = conf.get_string("general.api")?; + let market_api: &str = binding.as_str(); + + // Creating a Puller + let puller: Puller = Puller::init( + conf.get_string(format!("updater.{}.key", market_api).as_str())? + ); + + // Returning a Updater + return Ok(Updater { + database: database, + puller: puller + }); + } + + // Functions + fn get_mark_api(&self, conf: &Conf) -> Result> { + return Ok(match conf.get_string("general.api")?.as_str() { + "marketstack" => puller::MarketAPI::MarketStack, + "alphavantage" => puller::MarketAPI::AlphaVantage, + _ => puller::MarketAPI::MarketStack + }); + } + fn get_mark_mth(&self, conf: &Conf) -> Result> { + return Ok(match conf.get_string("general.method")?.as_str() { + "eod" => puller::MarketMethod::EndOfDay, + "intraday" => puller::MarketMethod::Intraday, + "historical" => puller::MarketMethod::Historical { + from: conf.get_string("general.historical.from")?, + to: conf.get_string("general.historical.to")? + }, + _ => puller::MarketMethod::EndOfDay + }); + } + fn get_time_int(&self, conf: &Conf) -> Result> { + return Ok(match conf.get_i32("general.interval")? { + 15 => puller::TimeInterval::M15, + 30 => puller::TimeInterval::M30, + 60 => puller::TimeInterval::M60, + _ => puller::TimeInterval::M15 + }); + } + fn get_time_dur(&self, row_last: String) -> Result> { + // Getting current time + let now: chrono::DateTime = Utc::now(); + + // How long ago was the last update? + let parsed_time: NaiveDateTime = NaiveDateTime::parse_from_str(&row_last, "%Y-%m-%d %H:%M:%S")?; + let parsed_utc_time: chrono::DateTime = chrono::DateTime::::from_naive_utc_and_offset(parsed_time, Utc); + let duration: i64 = now.signed_duration_since(parsed_utc_time).num_minutes().abs(); + + // Return result + return Ok(duration); + } + + pub fn needs_update(&mut self, conf: &Conf, symbol: &str) -> Result> { + // Getting database + let database: &mut Database = { + &mut *self.database.lock().unwrap() + }; + + // Check if table exists + database.inst_table("Updater", "symbol TEXT, last DATETIME")?; + + // Have we updated before? + if !database.exists_row("Updater", "symbol", &format!("'{}'", symbol))? { + // Getting current date & time + let now: chrono::DateTime = Utc::now(); + let formatted: String = now.format("%Y-%m-%d %H:%M:%S").to_string(); + + // Append to Database + database.insert( + "Updater", + "symbol, last", + format!("'{}', '{}'", symbol, formatted).as_str() + )?; + + // Yes, we need to update + return Ok(true); + } + + // Getting the row for our symbol + let rows: Vec = database.get( + "Updater", + "*", + format!(" WHERE symbol='{}'", symbol).as_str() + )?; + + // Getting the last update + for row in rows { + // Getting row info + let row_symbol: String = row.get(0).unwrap(); + let row_last: String = row.get(1).unwrap(); + + // Is this the one we want? + if row_symbol == symbol { + // How long ago was the last update? + let duration: i64 = self.get_time_dur(row_last)?; + + // Getting minute duration from Config + let minute_duration: i64 = conf.get_i32("updater.frequency")? as i64; + + // Is the distance greater? + if duration > minute_duration { + // Getting current date & time + let now: chrono::DateTime = Utc::now(); + let formatted: String = now.format("%Y-%m-%d %H:%M:%S").to_string(); + + // Updating the row to have the New time + // UPDATE - Need this line because I BURNT THROUGH 101 DAMN REQUESTS 😭 + database.update( + "Updater", + "last", + &format!("'{}'", formatted), + &format!("symbol='{}'", symbol) + )?; + + // We do need to update. + return Ok(true); + } + } + } + + // Success + return Ok(false); + } + pub fn pull_symbol(&mut self, conf: &Conf, symbol: &str) -> Result<(), Box> { + // Getting method, interval, and API + let market_api: puller::MarketAPI = self.get_mark_api(conf)?; + let market_method: puller::MarketMethod = self.get_mark_mth(conf)?; + let space_interval: puller::TimeInterval = self.get_time_int(conf)?; + + // Pulling data from it + let data_points: Vec = self.puller.get(market_api, symbol, market_method, space_interval)?; + + // Getting database + let database: &mut Database = { + &mut *self.database.lock().unwrap() + }; + + // Checking if the database has a table for this symbol + database.inst_table(symbol, "date DATETIME, open FLOAT, high FLOAT, low FLOAT, close FLOAT")?; + + // Going through data points and appending it to the database + for point in data_points { + // Checking if this row already exists + if database.exists_row( + symbol, + "`date`", + &format!("'{}'", point.date.clone()) + )? { + continue; + } + + // Appending data + database.insert(symbol, "date, open, high, low, close", &format!( + "'{}', {}, {}, {}, {}", + point.date, point.open, point.high, point.low, point.close + ))?; + } + + // Success + return Ok(()); + } +} \ No newline at end of file diff --git a/project/src/updater/puller.rs b/project/src/updater/puller.rs new file mode 100644 index 0000000..117160e --- /dev/null +++ b/project/src/updater/puller.rs @@ -0,0 +1,208 @@ +// Libraries +use std::error::Error; + +use serde_json::Value; +use ureq::{self, Response}; + +// Enums +pub enum MarketAPI { + AlphaVantage, + MarketStack +} +pub enum TimeInterval { + M15, + M30, + M60 +} +pub enum MarketMethod { + EndOfDay, + Intraday, + Historical {from: String, to: String} +} + +// Structures +pub struct PulledData { + pub date: String, + pub open: f32, + pub high: f32, + pub low: f32, + pub close: f32 +} +pub struct Puller { + key: String +} + +// Implementations +impl Puller { + // Constructors + pub fn init(key: String) -> Puller { + // Returning a puller + return Puller { + key: key + }; + } + + // Functions + fn send_request(&self, url: String) -> Result> { + let callback: Result = ureq::get(&url).call(); + + // Getting the response code from the request + match callback { + Ok(res) => { + return Ok(res.into_string()?); + }, + Err(ureq::Error::Status(code, res)) => { + eprintln!("Puller> Received HTTP status {} with message: {} ({})", code, res.status_text(), url); + return Err(format!("HTTP error {}", code).into()); + }, + Err(e) => { + eprintln!("Puller> Request failed: {} ({})", e, url); + return Err(e.into()); + } + } + } + + fn handle_marketstack(&self, symbol: &str, method: MarketMethod, time_interval: &str) -> Result, Box> { + // Holding result + let mut result: Vec = Vec::new(); + + // Getting market method + let market_method: &str = match method { + MarketMethod::EndOfDay => "eod?", + MarketMethod::Intraday => "intraday?", + MarketMethod::Historical { from, to } => { + &format!("eod?date_from={}&date_to={}&", from, to) + } + }; + + // Getting response from API server + let url: String = format!("http://api.marketstack.com/v1/{}access_key={}&symbols={}&interval={}", market_method, self.key, symbol, time_interval); + let response: String; + + // Handling request + match self.send_request(url) { + Ok(res) => { + // Set response + response = res; + }, + Err(_e) => { + return Ok(Vec::new()); + } + } + + // Parsing response json + let data: Value = serde_json::from_str(&response)?; + + // Going through the time series + let time_series: &Vec = data["data"].as_array().unwrap(); + + // Going through each time series + for value in time_series { + // Getting data points + let date: &str = value["date"].as_str().unwrap(); + let open: f32 = value["open"].as_f64().unwrap() as f32; + let high: f32 = value["high"].as_f64().unwrap() as f32; + let low: f32 = value["low"].as_f64().unwrap() as f32; + let close: f32 = value["close"].as_f64().unwrap() as f32; + + // Adjusting date + let date_dmy: &str = date.split_once('T').unwrap().0; + let date_time: &str = date.split_once('T').unwrap().1.split_once('+').unwrap().0; + let final_date: String = format!("{} {}", date_dmy, date_time); + + // Storing it in a new array + result.push(PulledData { + date: final_date, + open: open, + high: high, + low: low, + close: close + }); + } + + // Return result + return Ok(result); + } + fn handle_alphavantage(&self, symbol: &str, method: MarketMethod, time_interval: &str) -> Result, Box> { + // Holding result + let mut result: Vec = Vec::new(); + + let market_method: &str = match method { + MarketMethod::EndOfDay => { + panic!("Puller> `EndOfDay` for AlphaVantage not yet implemented!"); + }, + MarketMethod::Intraday => "TIME_SERIES_INTRADAY", + MarketMethod::Historical { from: _, to: _ } => { + panic!("Puller> `Historical` for AlphaVantage not yet implemented!"); + } + }; + + // Getting response from API server + let url: String = format!("https://www.alphavantage.co/query?function={}&symbol={}&apikey={}&interval={}", market_method, symbol, self.key, time_interval); + let response: String; + + // Handling request + match self.send_request(url) { + Ok(res) => { + // Set response + response = res; + }, + Err(_e) => { + return Ok(Vec::new()); + } + } + + // Parsing response json + let data: Value = serde_json::from_str(&response)?; + + // Are we limited? + if data["Information"] != Value::Null { + eprintln!("Puller> API Malfunction ({}).", response); + return Ok(Vec::new()); + } + + // Going through the time series + let time_series: &serde_json::Map= data[format!("Time Series ({})", time_interval)].as_object().unwrap(); + + // Going through each time series + for (timestamp, value) in time_series { + // Getting data points + let date: String = timestamp.clone(); + let open: f32 = value["1. open"].as_str().unwrap().parse::().unwrap(); + let high: f32 = value["2. high"].as_str().unwrap().parse::().unwrap(); + let low: f32 = value["3. low"].as_str().unwrap().parse::().unwrap(); + let close: f32 = value["4. close"].as_str().unwrap().parse::().unwrap(); + + // Storing it in a new array + result.push(PulledData { + date: date, + open: open, + high: high, + low: low, + close: close + }); + } + + // Return result + return Ok(result); + } + + pub fn get(&self, api: MarketAPI, symbol: &str, method: MarketMethod, time: TimeInterval) -> Result, Box> { + // Getting time interval wanted + let time_interval: &str = match time { + TimeInterval::M15 => "15min", + TimeInterval::M30 => "30min", + TimeInterval::M60 => "60min" + }; + + // What API are we using? + match api { + MarketAPI::AlphaVantage => { + return Ok(self.handle_alphavantage(symbol, method, time_interval)?); + }, + MarketAPI::MarketStack => { + return Ok(self.handle_marketstack(symbol, method, time_interval)?); + } + } + } +}