1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
//! # Task management
//!
//! Create a task management to leverage the tokio framework
//! in order to more finely organize and control the different
//! modules utilized in jormungandr.
//!

use futures::{prelude::*, stream::FuturesUnordered};
use std::{
    error,
    fmt::Debug,
    future::Future,
    sync::mpsc::Sender,
    time::{Duration, Instant},
};
use thiserror::Error;
use tokio::{
    runtime::{Handle, Runtime},
    task::JoinHandle,
};
use tracing::{span, Level, Span};
use tracing_futures::Instrument;

/// hold onto the different services created
pub struct Services {
    services: Vec<Service>,
    finish_listener: FuturesUnordered<JoinHandle<Result<(), Box<dyn error::Error + Send + Sync>>>>,
    runtime: Runtime,
}

#[derive(Debug, Error)]
pub enum ServiceError {
    #[error(
        "service panicked: {}",
        .0
        .as_ref()
        .map(|reason| reason.as_ref())
        .unwrap_or("could not serialize the panic"),
    )]
    Panic(Option<String>),
    #[error("service future cancelled")]
    Cancelled,
    #[error("service error")]
    Service(#[source] Box<dyn error::Error>),
}

/// wrap up a service
///
/// A service will run with its own runtime system. It will be able to
/// (if configured for) spawn new async tasks that will share that same
/// runtime.
pub struct Service {
    /// this is the name of the service task, useful for logging and
    /// following activity of a given task within the app
    name: &'static str,

    /// provides us with information regarding the up time of the Service
    /// this will allow us to monitor if a service has been restarted
    /// without having to follow the log history of the service.
    up_time: Instant,
}

/// the current future service information
///
/// retrieve the name, the up time, the tracing span and the handle
pub struct TokioServiceInfo {
    name: &'static str,
    up_time: Instant,
    span: tracing::Span,
    handle: Handle,
}

pub struct TaskMessageBox<Msg>(Sender<Msg>);

/// Input for the different task with input service
///
/// If `Shutdown` is passed on, it means either there is
/// no more inputs to read (the Senders have been dropped), or the
/// service has been required to shutdown
pub enum Input<Msg> {
    /// the service has been required to shutdown
    Shutdown,
    /// input for the task
    Input(Msg),
}

impl Services {
    /// create a new set of services
    pub fn new(runtime: tokio::runtime::Runtime) -> Self {
        Services {
            services: Vec::new(),
            finish_listener: FuturesUnordered::new(),
            runtime,
        }
    }

    /// Spawn the given Future in a new dedicated runtime
    pub fn spawn_future<F, T>(&mut self, name: &'static str, f: F)
    where
        F: FnOnce(TokioServiceInfo) -> T,
        F: Send + 'static,
        T: Future<Output = ()> + Send + 'static,
    {
        let handle = self.runtime.handle().clone();
        let now = Instant::now();
        let tracing_span = span!(Level::TRACE, "service", kind = name);
        let future_service_info = TokioServiceInfo {
            name,
            up_time: now,
            span: tracing_span,
            handle,
        };
        let span_parent = future_service_info.span.clone();
        let handle = self.runtime.spawn(
            async move {
                f(future_service_info).await;
                tracing::info!("service `{}` finished", name);
                Ok::<_, std::convert::Infallible>(()).map_err(Into::into)
            }
            .instrument(span!(
                parent: span_parent,
                Level::TRACE,
                "service",
                kind = name
            )),
        );
        self.finish_listener.push(handle);

        let task = Service::new(name, now);
        self.services.push(task);
    }

    /// Spawn the given Future in a new dedicated runtime
    pub fn spawn_try_future<F, T, E>(&mut self, name: &'static str, f: F)
    where
        F: FnOnce(TokioServiceInfo) -> T,
        F: Send + 'static,
        T: Future<Output = Result<(), E>> + Send + 'static,
        E: error::Error + Send + Sync + 'static,
    {
        let handle = self.runtime.handle().clone();
        let now = Instant::now();
        let tracing_span = span!(Level::TRACE, "service", kind = name);

        let future_service_info = TokioServiceInfo {
            name,
            up_time: now,
            span: tracing_span,
            handle,
        };
        let parent_span = future_service_info.span.clone();
        let handle = self.runtime.spawn(
            async move {
                let res = f(future_service_info).await;
                if let Err(err) = &res {
                    tracing::error!(reason = %err.to_string(), "service finished with error");
                } else {
                    tracing::info!("service `{}` finished successfully", name);
                }
                res.map_err(Into::into)
            }
            .instrument(span!(
                parent: parent_span,
                Level::TRACE,
                "service",
                kind = name
            )),
        );
        self.finish_listener.push(handle);

        let task = Service::new(name, now);
        self.services.push(task);
    }

    /// select on all the started services. this function will block until first services returns
    pub fn wait_any_finished(self) -> Result<(), ServiceError> {
        let finish_listener = self.finish_listener;
        let result = self
            .runtime
            .block_on(async move { finish_listener.into_future().await.0 });
        match result {
            // No services were started or some service exited successfully
            None | Some(Ok(Ok(()))) => Ok(()),
            // Error produced by a service
            Some(Ok(Err(service_error))) => Err(ServiceError::Service(service_error)),
            // A service panicked or was cancelled by the environment
            Some(Err(join_error)) => {
                if join_error.is_cancelled() {
                    Err(ServiceError::Cancelled)
                } else if join_error.is_panic() {
                    let desc = join_error.into_panic().downcast_ref::<String>().cloned();
                    Err(ServiceError::Panic(desc))
                } else {
                    unreachable!("JoinError is either Cancelled or Panic")
                }
            }
        }
    }

    // Run the task to completion
    pub fn block_on_task<F, Fut, T>(&mut self, name: &'static str, f: F) -> T
    where
        F: FnOnce(TokioServiceInfo) -> Fut,
        Fut: Future<Output = T>,
    {
        let handle = self.runtime.handle().clone();
        let now = Instant::now();
        let parent_span = span!(Level::TRACE, "service", kind = name);
        let future_service_info = TokioServiceInfo {
            name,
            up_time: now,
            span: parent_span.clone(),
            handle,
        };
        parent_span.in_scope(|| {
            self.runtime
                .block_on(f(future_service_info).instrument(span!(
                    Level::TRACE,
                    "service",
                    kind = name
                )))
        })
    }
}

impl TokioServiceInfo {
    /// get the time this service has been running since
    #[inline]
    pub fn up_time(&self) -> Duration {
        Instant::now().duration_since(self.up_time)
    }

    /// get the name of this Service
    #[inline]
    pub fn name(&self) -> &'static str {
        self.name
    }

    /// Access the service's handle
    #[inline]
    pub fn runtime_handle(&self) -> &Handle {
        &self.handle
    }

    /// Access the parent service span
    #[inline]
    pub fn span(&self) -> &Span {
        &self.span
    }

    /// spawn a std::future within the service's tokio handle
    pub fn spawn<F>(&self, name: &'static str, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        tracing::trace!("service `{}` spawning task `{}`", self.name, name);
        self.handle
            .spawn(future.instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)));
    }

    /// just like spawn but instead log an error on Result::Err
    pub fn spawn_fallible<F, E>(&self, name: &'static str, future: F)
    where
        F: Send + 'static,
        E: Debug,
        F: Future<Output = Result<(), E>>,
    {
        tracing::trace!("service `{}` spawning task `{}`", self.name, name);
        self.handle.spawn(
            async move {
                match future.await {
                    Ok(()) => tracing::trace!("task {} finished successfully", name),
                    Err(e) => {
                        tracing::error!(reason = ?e, "task {} finished with error", name)
                    }
                }
            }
            .instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
        );
    }

    /// just like spawn but add a timeout
    pub fn timeout_spawn<F>(&self, name: &'static str, timeout: Duration, future: F)
    where
        F: Future<Output = ()> + Send + 'static,
    {
        tracing::trace!("spawning {}", name);
        self.handle.spawn(
            async move {
                if tokio::time::timeout(timeout, future).await.is_err() {
                    tracing::error!("task {} timed out", name)
                }
            }
            .instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
        );
    }

    /// just like spawn_failable but add a timeout
    pub fn timeout_spawn_fallible<F, E>(&self, name: &'static str, timeout: Duration, future: F)
    where
        F: Send + 'static,
        E: Debug,
        F: Future<Output = Result<(), E>>,
    {
        tracing::trace!("spawning {}", name);
        self.handle.spawn(
            async move {
                match tokio::time::timeout(timeout, future).await {
                    Err(_) => tracing::error!("task {} timed out", name),
                    Ok(Err(e)) => tracing::error!(reason = ?e, "task {} finished with error", name),
                    Ok(Ok(())) => {}
                };
            }
            .instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
        );
    }

    // Run the closure with the specified period on the handle
    // and execute the resulting closure.
    pub fn run_periodic<F, U>(&self, name: &'static str, period: Duration, mut f: F)
    where
        F: FnMut() -> U,
        F: Send + 'static,
        U: Future<Output = ()> + Send + 'static,
    {
        self.spawn(
            name,
            async move {
                let mut interval = tokio::time::interval(period);
                loop {
                    let t_now = Instant::now();
                    interval.tick().await;
                    let t_last = Instant::now();
                    let elapsed = t_last.duration_since(t_now);
                    if elapsed > period * 2 {
                        tracing::warn!(
                            period = ?period,
                            elapsed = ?elapsed,
                            "periodic task `{}` started late", name
                        );
                    }
                    f().await;
                    tracing::trace!(
                        triggered_at = ?t_now,
                        "periodic task `{}` finished successfully",
                        name
                    );
                }
            }
            .instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
        );
    }

    // Run the closure with the specified period on the handle
    // and execute the resulting fallible async closure.
    // If the closure returns an Err, log it.
    pub fn run_periodic_fallible<F, U, E>(&self, name: &'static str, period: Duration, mut f: F)
    where
        F: FnMut() -> U,
        F: Send + 'static,
        E: Debug,
        U: Future<Output = Result<(), E>> + Send + 'static,
    {
        self.spawn(
            name,
            async move {
                let mut interval = tokio::time::interval(period);
                loop {
                    let t_now = Instant::now();
                    interval.tick().await;
                    let t_last = Instant::now();
                    let elapsed = t_last.duration_since(t_now);
                    if elapsed > period * 2 {
                        tracing::warn!(
                            period = ?period,
                            elapsed = ?elapsed,
                            "periodic task `{}` started late", name
                        );
                    }
                    match f().await {
                        Ok(()) => {
                            tracing::trace!(
                                triggered_at = ?t_now,
                                "periodic task `{}` finished successfully",
                                name,
                            );
                        }
                        Err(e) => {
                            tracing::error!(
                                triggered_at = ?t_now,
                                error = ?e,
                                "periodic task `{}` failed", name
                            );
                        }
                    };
                }
            }
            .instrument(span!(parent: &self.span, Level::TRACE, "task", kind = name)),
        );
    }
}

impl Service {
    /// get the time this service has been running since
    #[inline]
    pub fn up_time(&self) -> Duration {
        Instant::now().duration_since(self.up_time)
    }

    /// get the name of this Service
    #[inline]
    pub fn name(&self) -> &'static str {
        self.name
    }

    #[inline]
    fn new(name: &'static str, now: Instant) -> Self {
        Service { name, up_time: now }
    }
}

impl<Msg> Clone for TaskMessageBox<Msg> {
    fn clone(&self) -> Self {
        TaskMessageBox(self.0.clone())
    }
}

impl<Msg> TaskMessageBox<Msg> {
    pub fn send_to(&self, a: Msg) {
        self.0.send(a).unwrap()
    }
}