use futures::{prelude::*, stream::FuturesUnordered};
use std::{
error,
fmt::Debug,
future::Future,
sync::mpsc::Sender,
time::{Duration, Instant},
};
use thiserror::Error;
use tokio::{
runtime::{Handle, Runtime},
task::JoinHandle,
};
use tracing::{span, Level, Span};
use tracing_futures::Instrument;
pub struct Services {
services: Vec<Service>,
finish_listener: FuturesUnordered<JoinHandle<Result<(), Box<dyn error::Error + Send + Sync>>>>,
runtime: Runtime,
}
#[derive(Debug, Error)]
pub enum ServiceError {
#[error(
"service panicked: {}",
.0
.as_ref()
.map(|reason| reason.as_ref())
.unwrap_or("could not serialize the panic"),
)]
Panic(Option<String>),
#[error("service future cancelled")]
Cancelled,
#[error("service error")]
Service(#[source] Box<dyn error::Error>),
}
pub struct Service {
name: &'static str,
up_time: Instant,
}
pub struct TokioServiceInfo {
name: &'static str,
up_time: Instant,
span: tracing::Span,
handle: Handle,
}
pub struct TaskMessageBox<Msg>(Sender<Msg>);
pub enum Input<Msg> {
Shutdown,
Input(Msg),
}
impl Services {
pub fn new(runtime: tokio::runtime::Runtime) -> Self {
Services {
services: Vec::new(),
finish_listener: FuturesUnordered::new(),
runtime,
}
}
pub fn spawn_future<F, T>(&mut self, name: &'static str, f: F)
where
F: FnOnce(TokioServiceInfo) -> T,
F: Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
let handle = self.runtime.handle().clone();
let now = Instant::now();
let tracing_span = span!(Level::TRACE, "service", kind = name);
let future_service_info = TokioServiceInfo {
name,
up_time: now,
span: tracing_span,
handle,
};
let span_parent = future_service_info.span.clone();
let handle = self.runtime.spawn(
async move {
f(future_service_info).await;
tracing::info!("service `{}` finished", name);
Ok::<_, std::convert::Infallible>(()).map_err(Into::into)
}
.instrument(span!(
parent: span_parent,
Level::TRACE,
"service",
kind = name
)),
);
self.finish_listener.push(handle);
let task = Service::new(name, now);
self.services.push(task);
}
pub fn spawn_try_future<F, T, E>(&mut self, name: &'static str, f: F)
where
F: FnOnce(TokioServiceInfo) -> T,
F: Send + 'static,
T: Future<Output = Result<(), E>> + Send + 'static,
E: error::Error + Send + Sync + 'static,
{
let handle = self.runtime.handle().clone();
let now = Instant::now();
let tracing_span = span!(Level::TRACE, "service", kind = name);
let future_service_info = TokioServiceInfo {
name,
up_time: now,
span: tracing_span,
handle,
};
let parent_span = future_service_info.span.clone();
let handle = self.runtime.spawn(
async move {
let res = f(future_service_info).await;
if let Err(err) = &res {
tracing::error!(reason = %err.to_string(), "service finished with error");
} else {
tracing::info!("service `{}` finished successfully", name);
}
res.map_err(Into::into)
}
.instrument(span!(
parent: parent_span,
Level::TRACE,
"service",
kind = name
)),
);
self.finish_listener.push(handle);
let task = Service::new(name, now);
self.services.push(task);
}
pub fn wait_any_finished(self) -> Result<(), ServiceError> {
let finish_listener = self.finish_listener;
let result = self
.runtime
.block_on(async move { finish_listener.into_future().await.0 });
match result {
None | Some(Ok(Ok(()))) => Ok(()),
Some(Ok(Err(service_error))) => Err(ServiceError::Service(service_error)),
Some(Err(join_error)) => {
if join_error.is_cancelled() {
Err(ServiceError::Cancelled)
} else if join_error.is_panic() {
let desc = join_error.into_panic().downcast_ref::<String>().cloned();
Err(ServiceError::Panic(desc))
} else {
unreachable!("JoinError is either Cancelled or Panic")
}
}
}
}
pub fn block_on_task<F, Fut, T>(&mut self, name: &'static str, f: F) -> T
where
F: FnOnce(TokioServiceInfo) -> Fut,
Fut: Future<Output = T>,
{
let handle = self.runtime.handle().clone();
let now = Instant::now();
let parent_span = span!(Level::TRACE, "service", kind = name);
let future_service_info = TokioServiceInfo {
name,
up_time: now,
span: parent_span.clone(),
handle,
};
parent_span.in_scope(|| {
self.runtime
.block_on(f(future_service_info).instrument(span!(
Level::TRACE,
"service",
kind = name
)))
})
}
}
impl TokioServiceInfo {
#[inline]
pub fn up_time(&self) -> Duration {
Instant::now().duration_since(self.up_time)
}
#[inline]
pub fn name(&self) -> &'static str {
self.name
}
#[inline]
pub fn runtime_handle(&self) -> &Handle {
&self.handle
}
#[inline]
pub fn span(&self) -> &Span {
&self.span
}
pub fn spawn<F>(&self, name: &'static str, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
tracing::trace!("service `{}` spawning task `{}`", self.name, name);
self.handle
.spawn(future.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)));
}
pub fn spawn_fallible<F, E>(&self, name: &'static str, future: F)
where
F: Send + 'static,
E: Debug,
F: Future<Output = Result<(), E>>,
{
tracing::trace!("service `{}` spawning task `{}`", self.name, name);
self.handle.spawn(
async move {
match future.await {
Ok(()) => tracing::trace!("task {} finished successfully", name),
Err(e) => {
tracing::error!(reason = ?e, "task {} finished with error", name)
}
}
}
.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
);
}
pub fn timeout_spawn<F>(&self, name: &'static str, timeout: Duration, future: F)
where
F: Future<Output = ()> + Send + 'static,
{
tracing::trace!("spawning {}", name);
self.handle.spawn(
async move {
if tokio::time::timeout(timeout, future).await.is_err() {
tracing::error!("task {} timed out", name)
}
}
.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
);
}
pub fn timeout_spawn_fallible<F, E>(&self, name: &'static str, timeout: Duration, future: F)
where
F: Send + 'static,
E: Debug,
F: Future<Output = Result<(), E>>,
{
tracing::trace!("spawning {}", name);
self.handle.spawn(
async move {
match tokio::time::timeout(timeout, future).await {
Err(_) => tracing::error!("task {} timed out", name),
Ok(Err(e)) => tracing::error!(reason = ?e, "task {} finished with error", name),
Ok(Ok(())) => {}
};
}
.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
);
}
pub fn run_periodic<F, U>(&self, name: &'static str, period: Duration, mut f: F)
where
F: FnMut() -> U,
F: Send + 'static,
U: Future<Output = ()> + Send + 'static,
{
self.spawn(
name,
async move {
let mut interval = tokio::time::interval(period);
loop {
let t_now = Instant::now();
interval.tick().await;
let t_last = Instant::now();
let elapsed = t_last.duration_since(t_now);
if elapsed > period * 2 {
tracing::warn!(
period = ?period,
elapsed = ?elapsed,
"periodic task `{}` started late", name
);
}
f().await;
tracing::trace!(
triggered_at = ?t_now,
"periodic task `{}` finished successfully",
name
);
}
}
.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
);
}
pub fn run_periodic_fallible<F, U, E>(&self, name: &'static str, period: Duration, mut f: F)
where
F: FnMut() -> U,
F: Send + 'static,
E: Debug,
U: Future<Output = Result<(), E>> + Send + 'static,
{
self.spawn(
name,
async move {
let mut interval = tokio::time::interval(period);
loop {
let t_now = Instant::now();
interval.tick().await;
let t_last = Instant::now();
let elapsed = t_last.duration_since(t_now);
if elapsed > period * 2 {
tracing::warn!(
period = ?period,
elapsed = ?elapsed,
"periodic task `{}` started late", name
);
}
match f().await {
Ok(()) => {
tracing::trace!(
triggered_at = ?t_now,
"periodic task `{}` finished successfully",
name,
);
}
Err(e) => {
tracing::error!(
triggered_at = ?t_now,
error = ?e,
"periodic task `{}` failed", name
);
}
};
}
}
.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
);
}
}
impl Service {
#[inline]
pub fn up_time(&self) -> Duration {
Instant::now().duration_since(self.up_time)
}
#[inline]
pub fn name(&self) -> &'static str {
self.name
}
#[inline]
fn new(name: &'static str, now: Instant) -> Self {
Service { name, up_time: now }
}
}
impl<Msg> Clone for TaskMessageBox<Msg> {
fn clone(&self) -> Self {
TaskMessageBox(self.0.clone())
}
}
impl<Msg> TaskMessageBox<Msg> {
pub fn send_to(&self, a: Msg) {
self.0.send(a).unwrap()
}
}