Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
asm-flairstech committed Apr 8, 2024
1 parent 0070d35 commit 44a938d
Show file tree
Hide file tree
Showing 8 changed files with 6,693 additions and 42 deletions.
6,661 changes: 6,661 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ run = "cargo run"
check = "cargo check"
build = "cargo build"
release = "cargo build --release"
features = "cargo tree --features"
features = "cargo tree --features"
4 changes: 4 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ pub mod web;
pub use self::app::*;

mod app;

pub(crate) trait ThreadSafe: Send + Sync {}
pub(crate) trait ThreadClonable: ThreadSafe + Clone {}
pub(crate) trait ThreadStatic: ThreadClonable + 'static {}
16 changes: 6 additions & 10 deletions src/threading/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::{
};

use super::{cond::Mutcond, *};
use crate::{ThreadClonable, ThreadStatic};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ConsumerOptions {
Expand Down Expand Up @@ -72,7 +73,7 @@ impl ConsumerOptions {
}

#[derive(Clone, Debug)]
pub struct Consumer<T: Send + Sync + Clone + 'static> {
pub struct Consumer<T: ThreadStatic> {
options: ConsumerOptions,
items: Arc<Mutex<LinkedList<T>>>,
items_cond: Arc<Mutcond>,
Expand All @@ -87,7 +88,7 @@ pub struct Consumer<T: Send + Sync + Clone + 'static> {
running: Arc<AtomicUsize>,
}

impl<T: Send + Sync + Clone> Consumer<T> {
impl<T: ThreadClonable> Consumer<T> {
pub fn new() -> Self {
Consumer {
options: Default::default(),
Expand Down Expand Up @@ -207,10 +208,7 @@ impl<T: Send + Sync + Clone> Consumer<T> {
self.running.fetch_sub(1, Ordering::SeqCst);
}

pub fn start<TD: TaskDelegation<Consumer<T>, T> + Send + Sync + Clone + 'static>(
&self,
delegate: &TD,
) {
pub fn start<TD: TaskDelegation<Consumer<T>, T> + ThreadStatic>(&self, delegate: &TD) {
if self.is_cancelled() {
panic!("Queue is already cancelled.")
}
Expand Down Expand Up @@ -331,9 +329,7 @@ impl<T: Send + Sync + Clone> Consumer<T> {
}
}

pub async fn start_async<
TD: AsyncTaskDelegation<Consumer<T>, T> + Send + Sync + Clone + 'static,
>(
pub async fn start_async<TD: AsyncTaskDelegation<Consumer<T>, T> + ThreadStatic>(
&self,
delegate: &TD,
) {
Expand Down Expand Up @@ -571,7 +567,7 @@ impl<T: Send + Sync + Clone> Consumer<T> {
}
}

impl<T: Send + Sync + Clone> AwaitableConsumer for Consumer<T> {
impl<T: ThreadClonable> AwaitableConsumer for Consumer<T> {
fn is_cancelled(&self) -> bool {
Consumer::is_cancelled(self)
}
Expand Down
16 changes: 6 additions & 10 deletions src/threading/injector_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::{
};

use super::{cond::Mutcond, *};
use crate::{ThreadClonable, ThreadStatic};

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InjectorWorkerOptions {
Expand Down Expand Up @@ -70,7 +71,7 @@ impl InjectorWorkerOptions {
}

#[derive(Debug, Clone)]
pub struct InjectorWorker<T: Send + Sync + Clone + 'static> {
pub struct InjectorWorker<T: ThreadStatic> {
options: InjectorWorkerOptions,
injector: Arc<Injector<T>>,
stealers: Arc<Mutex<Vec<Stealer<T>>>>,
Expand All @@ -85,7 +86,7 @@ pub struct InjectorWorker<T: Send + Sync + Clone + 'static> {
running: Arc<AtomicUsize>,
}

impl<T: Send + Sync + Clone> InjectorWorker<T> {
impl<T: ThreadClonable> InjectorWorker<T> {
pub fn new() -> Self {
let options: InjectorWorkerOptions = Default::default();
InjectorWorker {
Expand Down Expand Up @@ -206,10 +207,7 @@ impl<T: Send + Sync + Clone> InjectorWorker<T> {
self.running.fetch_sub(1, Ordering::SeqCst);
}

pub fn start<TD: TaskDelegation<InjectorWorker<T>, T> + Send + Sync + Clone + 'static>(
&self,
delegate: &TD,
) {
pub fn start<TD: TaskDelegation<InjectorWorker<T>, T> + ThreadStatic>(&self, delegate: &TD) {
if self.is_cancelled() {
panic!("Queue is already cancelled.")
}
Expand Down Expand Up @@ -317,9 +315,7 @@ impl<T: Send + Sync + Clone> InjectorWorker<T> {
}
}

pub async fn start_async<
TD: AsyncTaskDelegation<InjectorWorker<T>, T> + Send + Sync + Clone + 'static,
>(
pub async fn start_async<TD: AsyncTaskDelegation<InjectorWorker<T>, T> + ThreadStatic>(
&self,
delegate: &TD,
) {
Expand Down Expand Up @@ -523,7 +519,7 @@ impl<T: Send + Sync + Clone> InjectorWorker<T> {
}
}

impl<T: Send + Sync + Clone> AwaitableConsumer for InjectorWorker<T> {
impl<T: ThreadClonable> AwaitableConsumer for InjectorWorker<T> {
fn is_cancelled(&self) -> bool {
self.is_cancelled()
}
Expand Down
9 changes: 4 additions & 5 deletions src/threading/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use self::parallel_consumer::*;
pub use self::producer_consumer::*;

use super::error;
use crate::{ThreadClonable, ThreadStatic};

mod cond;
mod consumer;
Expand Down Expand Up @@ -74,20 +75,18 @@ impl fmt::Display for QueueBehavior {
}
}

pub trait TaskDelegationBase<TD: Send + Clone + 'static, T: Send + Clone + 'static> {
pub trait TaskDelegationBase<TD: ThreadStatic, T: ThreadStatic> {
fn on_started(&self, td: &TD);
fn on_completed(&self, td: &TD, item: &T, result: &TaskResult) -> bool;
fn on_cancelled(&self, td: &TD);
fn on_finished(&self, td: &TD);
}

pub trait TaskDelegation<TD: Send + Clone + 'static, T: Send + Clone + 'static>:
TaskDelegationBase<TD, T>
{
pub trait TaskDelegation<TD: ThreadStatic, T: ThreadStatic>: TaskDelegationBase<TD, T> {
fn process(&self, td: &TD, item: &T) -> Result<TaskResult>;
}

pub trait AsyncTaskDelegation<TD: Send + Clone + 'static, T: Send + Clone + 'static>:
pub trait AsyncTaskDelegation<TD: ThreadStatic, T: ThreadStatic>:
TaskDelegationBase<TD, T>
{
fn process(&self, td: &TD, item: &T) -> impl Future<Output = Result<TaskResult>> + Send;
Expand Down
8 changes: 4 additions & 4 deletions src/threading/parallel_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl ParallelOptions {
}

#[derive(Clone, Debug)]
pub struct Parallel<T: Send + Sync + Clone + 'static> {
pub struct Parallel<T: ThreadStatic> {
options: ParallelOptions,
started: Arc<Mutex<bool>>,
finished: Arc<AtomicBool>,
Expand All @@ -124,7 +124,7 @@ pub struct Parallel<T: Send + Sync + Clone + 'static> {
_marker: PhantomData<T>,
}

impl<T: Send + Sync + Clone> Parallel<T> {
impl<T: ThreadClonable> Parallel<T> {
pub fn new() -> Self {
let options: ParallelOptions = Default::default();
Parallel {
Expand Down Expand Up @@ -212,7 +212,7 @@ impl<T: Send + Sync + Clone> Parallel<T> {

pub fn start<
I: IntoParallelIterator<Item = T> + Len + Send + 'static,
TD: TaskDelegation<Parallel<T>, T> + Send + Sync + Clone + 'static,
TD: TaskDelegation<Parallel<T>, T> + ThreadStatic,
>(
&self,
collection: I,
Expand Down Expand Up @@ -334,7 +334,7 @@ impl<T: Send + Sync + Clone> Parallel<T> {
}
}

impl<T: Send + Sync + Clone> AwaitableConsumer for Parallel<T> {
impl<T: ThreadClonable> AwaitableConsumer for Parallel<T> {
fn is_cancelled(&self) -> bool {
Parallel::is_cancelled(self)
}
Expand Down
19 changes: 7 additions & 12 deletions src/threading/producer_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ impl ProducerConsumerOptions {
}

#[derive(Clone, Debug)]
pub struct Producer<T: Send + Sync + Clone + 'static> {
pub struct Producer<T: ThreadStatic> {
pc: Arc<ProducerConsumer<T>>,
sender: Arc<channel::Sender<T>>,
}

impl<T: Send + Sync + Clone> Producer<T> {
impl<T: ThreadClonable> Producer<T> {
fn new(pc: &ProducerConsumer<T>, sender: &channel::Sender<T>) -> Self {
Producer {
pc: Arc::new(pc.clone()),
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<T: Send + Sync + Clone> Producer<T> {
}

#[derive(Clone, Debug)]
pub struct ProducerConsumer<T: Send + Sync + Clone + 'static> {
pub struct ProducerConsumer<T: ThreadStatic> {
options: ProducerConsumerOptions,
started: Arc<Mutex<bool>>,
finished: Arc<AtomicBool>,
Expand All @@ -120,7 +120,7 @@ pub struct ProducerConsumer<T: Send + Sync + Clone + 'static> {
receiver: channel::Receiver<T>,
}

impl<T: Send + Sync + Clone> ProducerConsumer<T> {
impl<T: ThreadClonable> ProducerConsumer<T> {
pub fn new() -> Self {
let options: ProducerConsumerOptions = Default::default();
let (sender, receiver) = channel::bounded::<T>(options.capacity);
Expand Down Expand Up @@ -255,10 +255,7 @@ impl<T: Send + Sync + Clone> ProducerConsumer<T> {
Producer::new(self, &self.sender)
}

pub fn start<TD: TaskDelegation<ProducerConsumer<T>, T> + Send + Sync + Clone + 'static>(
&self,
delegate: &TD,
) {
pub fn start<TD: TaskDelegation<ProducerConsumer<T>, T> + ThreadStatic>(&self, delegate: &TD) {
if self.is_cancelled() {
panic!("Queue is already cancelled.")
}
Expand Down Expand Up @@ -375,9 +372,7 @@ impl<T: Send + Sync + Clone> ProducerConsumer<T> {
}
}

pub async fn start_async<
TD: AsyncTaskDelegation<ProducerConsumer<T>, T> + Send + Sync + Clone + 'static,
>(
pub async fn start_async<TD: AsyncTaskDelegation<ProducerConsumer<T>, T> + ThreadStatic>(
&self,
delegate: &TD,
) {
Expand Down Expand Up @@ -538,7 +533,7 @@ impl<T: Send + Sync + Clone> ProducerConsumer<T> {
}
}

impl<T: Send + Sync + Clone> AwaitableConsumer for ProducerConsumer<T> {
impl<T: ThreadClonable> AwaitableConsumer for ProducerConsumer<T> {
fn is_cancelled(&self) -> bool {
ProducerConsumer::is_cancelled(self)
}
Expand Down

0 comments on commit 44a938d

Please sign in to comment.