PDF coordinator
This commit is contained in:
parent
0d91994a4c
commit
a922896fb8
@ -1,9 +1,10 @@
|
|||||||
use std::{sync::{Arc}, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use async_mutex::Mutex;
|
use async_mutex::Mutex;
|
||||||
use headless_chrome::{Browser, LaunchOptions};
|
use headless_chrome::{Browser, LaunchOptions};
|
||||||
use log::{warn, error};
|
use log::{error, info};
|
||||||
use rocket::{fairing::{Fairing, self}, Rocket, Build, catcher::Result, Request, Response, request::{FromRequest, Outcome}};
|
use rocket::{fairing::{Fairing, self}, Rocket, Build, Request, request::{FromRequest, Outcome}, futures::pin_mut, http::Status};
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct Chromium {
|
pub struct Chromium {
|
||||||
@ -32,7 +33,6 @@ impl Fairing for Chromium {
|
|||||||
async fn on_ignite(&self, rocket: Rocket<Build>) -> fairing::Result {
|
async fn on_ignite(&self, rocket: Rocket<Build>) -> fairing::Result {
|
||||||
let new_rocket = rocket.manage(ChromiumCoordinator::new().await);
|
let new_rocket = rocket.manage(ChromiumCoordinator::new().await);
|
||||||
let coordinator = new_rocket.state::<ChromiumCoordinator>().unwrap();
|
let coordinator = new_rocket.state::<ChromiumCoordinator>().unwrap();
|
||||||
error!("{:p}", &coordinator);
|
|
||||||
Ok(new_rocket)
|
Ok(new_rocket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -41,61 +41,60 @@ pub struct ChromiumCoordinator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ChromiumCoordinator {
|
impl ChromiumCoordinator {
|
||||||
const NUMBER_OF_INSTANCES: usize = 1;
|
const NUMBER_OF_INSTANCES: usize = 10;
|
||||||
|
|
||||||
pub async fn new() -> Self {
|
pub async fn new() -> Self {
|
||||||
error!("ChromiumCoordinator::new()");
|
|
||||||
let instances: Arc<Mutex<Vec<BrowserHolder>>> = Arc::new(Mutex::new(Vec::with_capacity(Self::NUMBER_OF_INSTANCES)));
|
let instances: Arc<Mutex<Vec<BrowserHolder>>> = Arc::new(Mutex::new(Vec::with_capacity(Self::NUMBER_OF_INSTANCES)));
|
||||||
while instances.lock().await.len() < Self::NUMBER_OF_INSTANCES {
|
while instances.lock().await.len() < Self::NUMBER_OF_INSTANCES {
|
||||||
instances.lock().await.push(BrowserHolder { browser: Browser::new(LaunchOptions {
|
instances.lock().await.push(Self::create_browser_instance());
|
||||||
idle_browser_timeout: Duration::from_secs(1), // Wait inifinity for commands
|
|
||||||
..LaunchOptions::default()
|
|
||||||
}).unwrap() });
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Self { instances }
|
Self { instances }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn create_browser_instance() -> BrowserHolder {
|
||||||
|
BrowserHolder { browser: Browser::new(LaunchOptions {
|
||||||
|
idle_browser_timeout: Duration::MAX, // Wait inifinity for commands
|
||||||
|
..LaunchOptions::default()
|
||||||
|
}).unwrap() }
|
||||||
|
}
|
||||||
|
|
||||||
fn spawn_browser(&self) {
|
fn spawn_browser(&self) {
|
||||||
let instances = self.instances.clone();
|
let instances = self.instances.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
error!("Spawn new instance of browser");
|
info!("Spawn new instance of browser");
|
||||||
// Create new instance
|
// Create new instance
|
||||||
let browser = Browser::new(LaunchOptions::default()).unwrap();
|
instances.lock().await.push(Self::create_browser_instance());
|
||||||
instances.lock().await.push( BrowserHolder {browser });
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
fn try_get_instance(&self) -> Option<BrowserHolder> {
|
async fn try_get_instance(&self) -> Option<BrowserHolder> {
|
||||||
let mut instances = self.instances.lock().await;
|
let mut instances = self.instances.lock().await;
|
||||||
if instances.len() == 0 {
|
if instances.len() == 0 {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
self.spawn_browser();
|
||||||
Some(instances.remove(0))
|
Some(instances.remove(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn try_get_browser(&self) -> std::result::Result<BrowserHolder, ()> {
|
async fn loop_get_instance(&self) -> BrowserHolder {
|
||||||
let instances = self.instances.clone();
|
loop {
|
||||||
|
match self.try_get_instance().await {
|
||||||
let mut loop_count = 0;
|
Some(instance) => {
|
||||||
|
return instance
|
||||||
while instances.lock().await.len() == 0 {
|
}
|
||||||
warn!("Waiting for Chromium instances to start...");
|
None => {
|
||||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
sleep(std::time::Duration::from_millis(100)).await;
|
||||||
if loop_count > 100 * 10 * 60 {
|
|
||||||
panic!("Can't start Chromium instances");
|
|
||||||
}
|
}
|
||||||
loop_count += 1;
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let browser = self.instances.lock().await.remove(0);
|
pub async fn try_get_browser(&self) -> std::result::Result<BrowserHolder, ()> {
|
||||||
self.spawn_browser();
|
let browser = self.loop_get_instance().await;
|
||||||
// test connection state
|
|
||||||
match browser.browser.get_version() {
|
match browser.browser.get_version() {
|
||||||
Ok(_) => Ok(browser),
|
Ok(_) => Ok(browser),
|
||||||
Err(_) => {
|
Err(_) => Err(()),
|
||||||
Err(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,7 +102,9 @@ impl ChromiumCoordinator {
|
|||||||
loop {
|
loop {
|
||||||
match self.try_get_browser().await {
|
match self.try_get_browser().await {
|
||||||
Ok(browser) => return Ok(browser),
|
Ok(browser) => return Ok(browser),
|
||||||
Err(_) => {} // all instances may be dead ... we must wait for new instance
|
Err(_) => {
|
||||||
|
// all instances may be dead ... we must wait for new instance
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -119,9 +120,23 @@ impl<'r> FromRequest<'r> for BrowserHolder {
|
|||||||
|
|
||||||
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, ()> {
|
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, ()> {
|
||||||
let coordinator = request.rocket().state::<ChromiumCoordinator>().unwrap();
|
let coordinator = request.rocket().state::<ChromiumCoordinator>().unwrap();
|
||||||
error!("{:p}", &coordinator);
|
let get_instance = coordinator.get_browser();
|
||||||
let browser = coordinator.get_browser().await.unwrap();
|
pin_mut!(get_instance); // TODO: wth?
|
||||||
Outcome::Success(browser)
|
let result = match tokio::time::timeout(std::time::Duration::from_secs(10), &mut get_instance).await {
|
||||||
|
Ok(maybebrowser) => match maybebrowser {
|
||||||
|
Ok(browser) => Outcome::<BrowserHolder, ()>::Success(browser),
|
||||||
|
Err(_) => {
|
||||||
|
error!("Can't create new instance of browser");
|
||||||
|
Outcome::<BrowserHolder, ()>::Error((Status::InternalServerError, ()))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
error!("Can't create new instance of browser (timeout)");
|
||||||
|
Outcome::<BrowserHolder, ()>::Error((Status::InternalServerError, ()))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user