generated from OBJNULL/Dockerized-Rust
Imported Updater
This commit is contained in:
parent
398708bd30
commit
82c1a96a06
2 changed files with 395 additions and 0 deletions
187
project/src/updater.rs
Normal file
187
project/src/updater.rs
Normal file
|
|
@ -0,0 +1,187 @@
|
|||
// Libraries
|
||||
use std::error::Error;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use chrono::{self, NaiveDateTime, Utc};
|
||||
|
||||
mod puller;
|
||||
|
||||
use crate::base::database::Database;
|
||||
use crate::base::conf::Conf;
|
||||
use puller::Puller;
|
||||
|
||||
// Structures
|
||||
pub struct Updater {
|
||||
database: Arc<Mutex<Database>>,
|
||||
puller: Puller
|
||||
}
|
||||
|
||||
// Implementations
|
||||
impl Updater {
|
||||
// Constructors
|
||||
pub fn init(conf: &Conf, database: Arc<Mutex<Database>>) -> Result<Updater, Box<dyn Error>> {
|
||||
// Getting the Market API
|
||||
let binding = conf.get_string("general.api")?;
|
||||
let market_api: &str = binding.as_str();
|
||||
|
||||
// Creating a Puller
|
||||
let puller: Puller = Puller::init(
|
||||
conf.get_string(format!("updater.{}.key", market_api).as_str())?
|
||||
);
|
||||
|
||||
// Returning a Updater
|
||||
return Ok(Updater {
|
||||
database: database,
|
||||
puller: puller
|
||||
});
|
||||
}
|
||||
|
||||
// Functions
|
||||
fn get_mark_api(&self, conf: &Conf) -> Result<puller::MarketAPI, Box<dyn Error>> {
|
||||
return Ok(match conf.get_string("general.api")?.as_str() {
|
||||
"marketstack" => puller::MarketAPI::MarketStack,
|
||||
"alphavantage" => puller::MarketAPI::AlphaVantage,
|
||||
_ => puller::MarketAPI::MarketStack
|
||||
});
|
||||
}
|
||||
fn get_mark_mth(&self, conf: &Conf) -> Result<puller::MarketMethod, Box<dyn Error>> {
|
||||
return Ok(match conf.get_string("general.method")?.as_str() {
|
||||
"eod" => puller::MarketMethod::EndOfDay,
|
||||
"intraday" => puller::MarketMethod::Intraday,
|
||||
"historical" => puller::MarketMethod::Historical {
|
||||
from: conf.get_string("general.historical.from")?,
|
||||
to: conf.get_string("general.historical.to")?
|
||||
},
|
||||
_ => puller::MarketMethod::EndOfDay
|
||||
});
|
||||
}
|
||||
fn get_time_int(&self, conf: &Conf) -> Result<puller::TimeInterval, Box<dyn Error>> {
|
||||
return Ok(match conf.get_i32("general.interval")? {
|
||||
15 => puller::TimeInterval::M15,
|
||||
30 => puller::TimeInterval::M30,
|
||||
60 => puller::TimeInterval::M60,
|
||||
_ => puller::TimeInterval::M15
|
||||
});
|
||||
}
|
||||
fn get_time_dur(&self, row_last: String) -> Result<i64, Box<dyn Error>> {
|
||||
// Getting current time
|
||||
let now: chrono::DateTime<Utc> = Utc::now();
|
||||
|
||||
// How long ago was the last update?
|
||||
let parsed_time: NaiveDateTime = NaiveDateTime::parse_from_str(&row_last, "%Y-%m-%d %H:%M:%S")?;
|
||||
let parsed_utc_time: chrono::DateTime<Utc> = chrono::DateTime::<Utc>::from_naive_utc_and_offset(parsed_time, Utc);
|
||||
let duration: i64 = now.signed_duration_since(parsed_utc_time).num_minutes().abs();
|
||||
|
||||
// Return result
|
||||
return Ok(duration);
|
||||
}
|
||||
|
||||
pub fn needs_update(&mut self, conf: &Conf, symbol: &str) -> Result<bool, Box<dyn Error>> {
|
||||
// Getting database
|
||||
let database: &mut Database = {
|
||||
&mut *self.database.lock().unwrap()
|
||||
};
|
||||
|
||||
// Check if table exists
|
||||
database.inst_table("Updater", "symbol TEXT, last DATETIME")?;
|
||||
|
||||
// Have we updated before?
|
||||
if !database.exists_row("Updater", "symbol", &format!("'{}'", symbol))? {
|
||||
// Getting current date & time
|
||||
let now: chrono::DateTime<Utc> = Utc::now();
|
||||
let formatted: String = now.format("%Y-%m-%d %H:%M:%S").to_string();
|
||||
|
||||
// Append to Database
|
||||
database.insert(
|
||||
"Updater",
|
||||
"symbol, last",
|
||||
format!("'{}', '{}'", symbol, formatted).as_str()
|
||||
)?;
|
||||
|
||||
// Yes, we need to update
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
// Getting the row for our symbol
|
||||
let rows: Vec<mysql::Row> = database.get(
|
||||
"Updater",
|
||||
"*",
|
||||
format!(" WHERE symbol='{}'", symbol).as_str()
|
||||
)?;
|
||||
|
||||
// Getting the last update
|
||||
for row in rows {
|
||||
// Getting row info
|
||||
let row_symbol: String = row.get(0).unwrap();
|
||||
let row_last: String = row.get(1).unwrap();
|
||||
|
||||
// Is this the one we want?
|
||||
if row_symbol == symbol {
|
||||
// How long ago was the last update?
|
||||
let duration: i64 = self.get_time_dur(row_last)?;
|
||||
|
||||
// Getting minute duration from Config
|
||||
let minute_duration: i64 = conf.get_i32("updater.frequency")? as i64;
|
||||
|
||||
// Is the distance greater?
|
||||
if duration > minute_duration {
|
||||
// Getting current date & time
|
||||
let now: chrono::DateTime<Utc> = Utc::now();
|
||||
let formatted: String = now.format("%Y-%m-%d %H:%M:%S").to_string();
|
||||
|
||||
// Updating the row to have the New time
|
||||
// UPDATE - Need this line because I BURNT THROUGH 101 DAMN REQUESTS 😭
|
||||
database.update(
|
||||
"Updater",
|
||||
"last",
|
||||
&format!("'{}'", formatted),
|
||||
&format!("symbol='{}'", symbol)
|
||||
)?;
|
||||
|
||||
// We do need to update.
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Success
|
||||
return Ok(false);
|
||||
}
|
||||
pub fn pull_symbol(&mut self, conf: &Conf, symbol: &str) -> Result<(), Box<dyn Error>> {
|
||||
// Getting method, interval, and API
|
||||
let market_api: puller::MarketAPI = self.get_mark_api(conf)?;
|
||||
let market_method: puller::MarketMethod = self.get_mark_mth(conf)?;
|
||||
let space_interval: puller::TimeInterval = self.get_time_int(conf)?;
|
||||
|
||||
// Pulling data from it
|
||||
let data_points: Vec<puller::PulledData> = self.puller.get(market_api, symbol, market_method, space_interval)?;
|
||||
|
||||
// Getting database
|
||||
let database: &mut Database = {
|
||||
&mut *self.database.lock().unwrap()
|
||||
};
|
||||
|
||||
// Checking if the database has a table for this symbol
|
||||
database.inst_table(symbol, "date DATETIME, open FLOAT, high FLOAT, low FLOAT, close FLOAT")?;
|
||||
|
||||
// Going through data points and appending it to the database
|
||||
for point in data_points {
|
||||
// Checking if this row already exists
|
||||
if database.exists_row(
|
||||
symbol,
|
||||
"`date`",
|
||||
&format!("'{}'", point.date.clone())
|
||||
)? {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Appending data
|
||||
database.insert(symbol, "date, open, high, low, close", &format!(
|
||||
"'{}', {}, {}, {}, {}",
|
||||
point.date, point.open, point.high, point.low, point.close
|
||||
))?;
|
||||
}
|
||||
|
||||
// Success
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
208
project/src/updater/puller.rs
Normal file
208
project/src/updater/puller.rs
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
// Libraries
|
||||
use std::error::Error;
|
||||
|
||||
use serde_json::Value;
|
||||
use ureq::{self, Response};
|
||||
|
||||
// Enums
|
||||
pub enum MarketAPI {
|
||||
AlphaVantage,
|
||||
MarketStack
|
||||
}
|
||||
pub enum TimeInterval {
|
||||
M15,
|
||||
M30,
|
||||
M60
|
||||
}
|
||||
pub enum MarketMethod {
|
||||
EndOfDay,
|
||||
Intraday,
|
||||
Historical {from: String, to: String}
|
||||
}
|
||||
|
||||
// Structures
|
||||
pub struct PulledData {
|
||||
pub date: String,
|
||||
pub open: f32,
|
||||
pub high: f32,
|
||||
pub low: f32,
|
||||
pub close: f32
|
||||
}
|
||||
pub struct Puller {
|
||||
key: String
|
||||
}
|
||||
|
||||
// Implementations
|
||||
impl Puller {
|
||||
// Constructors
|
||||
pub fn init(key: String) -> Puller {
|
||||
// Returning a puller
|
||||
return Puller {
|
||||
key: key
|
||||
};
|
||||
}
|
||||
|
||||
// Functions
|
||||
fn send_request(&self, url: String) -> Result<String, Box<dyn Error>> {
|
||||
let callback: Result<Response, ureq::Error> = ureq::get(&url).call();
|
||||
|
||||
// Getting the response code from the request
|
||||
match callback {
|
||||
Ok(res) => {
|
||||
return Ok(res.into_string()?);
|
||||
},
|
||||
Err(ureq::Error::Status(code, res)) => {
|
||||
eprintln!("Puller> Received HTTP status {} with message: {} ({})", code, res.status_text(), url);
|
||||
return Err(format!("HTTP error {}", code).into());
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("Puller> Request failed: {} ({})", e, url);
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_marketstack(&self, symbol: &str, method: MarketMethod, time_interval: &str) -> Result<Vec<PulledData>, Box<dyn Error>> {
|
||||
// Holding result
|
||||
let mut result: Vec<PulledData> = Vec::new();
|
||||
|
||||
// Getting market method
|
||||
let market_method: &str = match method {
|
||||
MarketMethod::EndOfDay => "eod?",
|
||||
MarketMethod::Intraday => "intraday?",
|
||||
MarketMethod::Historical { from, to } => {
|
||||
&format!("eod?date_from={}&date_to={}&", from, to)
|
||||
}
|
||||
};
|
||||
|
||||
// Getting response from API server
|
||||
let url: String = format!("http://api.marketstack.com/v1/{}access_key={}&symbols={}&interval={}", market_method, self.key, symbol, time_interval);
|
||||
let response: String;
|
||||
|
||||
// Handling request
|
||||
match self.send_request(url) {
|
||||
Ok(res) => {
|
||||
// Set response
|
||||
response = res;
|
||||
},
|
||||
Err(_e) => {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
}
|
||||
|
||||
// Parsing response json
|
||||
let data: Value = serde_json::from_str(&response)?;
|
||||
|
||||
// Going through the time series
|
||||
let time_series: &Vec<Value> = data["data"].as_array().unwrap();
|
||||
|
||||
// Going through each time series
|
||||
for value in time_series {
|
||||
// Getting data points
|
||||
let date: &str = value["date"].as_str().unwrap();
|
||||
let open: f32 = value["open"].as_f64().unwrap() as f32;
|
||||
let high: f32 = value["high"].as_f64().unwrap() as f32;
|
||||
let low: f32 = value["low"].as_f64().unwrap() as f32;
|
||||
let close: f32 = value["close"].as_f64().unwrap() as f32;
|
||||
|
||||
// Adjusting date
|
||||
let date_dmy: &str = date.split_once('T').unwrap().0;
|
||||
let date_time: &str = date.split_once('T').unwrap().1.split_once('+').unwrap().0;
|
||||
let final_date: String = format!("{} {}", date_dmy, date_time);
|
||||
|
||||
// Storing it in a new array
|
||||
result.push(PulledData {
|
||||
date: final_date,
|
||||
open: open,
|
||||
high: high,
|
||||
low: low,
|
||||
close: close
|
||||
});
|
||||
}
|
||||
|
||||
// Return result
|
||||
return Ok(result);
|
||||
}
|
||||
fn handle_alphavantage(&self, symbol: &str, method: MarketMethod, time_interval: &str) -> Result<Vec<PulledData>, Box<dyn Error>> {
|
||||
// Holding result
|
||||
let mut result: Vec<PulledData> = Vec::new();
|
||||
|
||||
let market_method: &str = match method {
|
||||
MarketMethod::EndOfDay => {
|
||||
panic!("Puller> `EndOfDay` for AlphaVantage not yet implemented!");
|
||||
},
|
||||
MarketMethod::Intraday => "TIME_SERIES_INTRADAY",
|
||||
MarketMethod::Historical { from: _, to: _ } => {
|
||||
panic!("Puller> `Historical` for AlphaVantage not yet implemented!");
|
||||
}
|
||||
};
|
||||
|
||||
// Getting response from API server
|
||||
let url: String = format!("https://www.alphavantage.co/query?function={}&symbol={}&apikey={}&interval={}", market_method, symbol, self.key, time_interval);
|
||||
let response: String;
|
||||
|
||||
// Handling request
|
||||
match self.send_request(url) {
|
||||
Ok(res) => {
|
||||
// Set response
|
||||
response = res;
|
||||
},
|
||||
Err(_e) => {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
}
|
||||
|
||||
// Parsing response json
|
||||
let data: Value = serde_json::from_str(&response)?;
|
||||
|
||||
// Are we limited?
|
||||
if data["Information"] != Value::Null {
|
||||
eprintln!("Puller> API Malfunction ({}).", response);
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
// Going through the time series
|
||||
let time_series: &serde_json::Map<String, Value>= data[format!("Time Series ({})", time_interval)].as_object().unwrap();
|
||||
|
||||
// Going through each time series
|
||||
for (timestamp, value) in time_series {
|
||||
// Getting data points
|
||||
let date: String = timestamp.clone();
|
||||
let open: f32 = value["1. open"].as_str().unwrap().parse::<f32>().unwrap();
|
||||
let high: f32 = value["2. high"].as_str().unwrap().parse::<f32>().unwrap();
|
||||
let low: f32 = value["3. low"].as_str().unwrap().parse::<f32>().unwrap();
|
||||
let close: f32 = value["4. close"].as_str().unwrap().parse::<f32>().unwrap();
|
||||
|
||||
// Storing it in a new array
|
||||
result.push(PulledData {
|
||||
date: date,
|
||||
open: open,
|
||||
high: high,
|
||||
low: low,
|
||||
close: close
|
||||
});
|
||||
}
|
||||
|
||||
// Return result
|
||||
return Ok(result);
|
||||
}
|
||||
|
||||
pub fn get(&self, api: MarketAPI, symbol: &str, method: MarketMethod, time: TimeInterval) -> Result<Vec<PulledData>, Box<dyn Error>> {
|
||||
// Getting time interval wanted
|
||||
let time_interval: &str = match time {
|
||||
TimeInterval::M15 => "15min",
|
||||
TimeInterval::M30 => "30min",
|
||||
TimeInterval::M60 => "60min"
|
||||
};
|
||||
|
||||
// What API are we using?
|
||||
match api {
|
||||
MarketAPI::AlphaVantage => {
|
||||
return Ok(self.handle_alphavantage(symbol, method, time_interval)?);
|
||||
},
|
||||
MarketAPI::MarketStack => {
|
||||
return Ok(self.handle_marketstack(symbol, method, time_interval)?);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue