use crate::{
    blockcfg::{Block, Header, HeaderHash},
    blockchain::{Storage, Tip},
    intercom::{ClientMsg, Error, ReplySendError, ReplyStreamHandle},
    utils::{async_msg::MessageQueue, task::TokioServiceInfo},
};
use futures::prelude::*;
use std::{convert::identity, time::Duration};
use tokio::time::timeout;
use tracing::{span, Level};
use tracing_futures::Instrument;
const PROCESS_TIMEOUT_GET_BLOCK_TIP: u64 = 5;
#[allow(dead_code)]
const PROCESS_TIMEOUT_GET_PEERS: u64 = 10;
const PROCESS_TIMEOUT_GET_HEADERS: u64 = 5 * 60;
const PROCESS_TIMEOUT_GET_HEADERS_RANGE: u64 = 5 * 60;
const PROCESS_TIMEOUT_GET_BLOCKS: u64 = 10 * 60;
const PROCESS_TIMEOUT_PULL_BLOCKS: u64 = 60 * 60;
const PROCESS_TIMEOUT_PULL_BLOCKS_TO_TIP: u64 = 60 * 60;
pub struct TaskData {
    pub storage: Storage,
    pub blockchain_tip: Tip,
}
pub async fn start(
    info: TokioServiceInfo,
    mut task_data: TaskData,
    mut input: MessageQueue<ClientMsg>,
) {
    while let Some(input) = input.next().await {
        handle_input(&info, &mut task_data, input);
    }
}
fn handle_input(info: &TokioServiceInfo, task_data: &mut TaskData, input: ClientMsg) {
    match input {
        ClientMsg::GetBlockTip(handle) => {
            let blockchain_tip = task_data.blockchain_tip.clone();
            let fut = async move {
                let tip = get_block_tip(blockchain_tip).await;
                handle.reply_ok(tip);
            };
            let span =
                span!(parent: info.span(), Level::TRACE, "sub_task", request = "GetBlockTip");
            info.spawn_fallible(
                "get block tip",
                timeout(Duration::from_secs(PROCESS_TIMEOUT_GET_BLOCK_TIP), fut)
                    .map_err(|e| {
                        tracing::error!(
                            error = ?e,
                            "request timed out or failed unexpectedly"
                        );
                    })
                    .instrument(span),
            );
        }
        ClientMsg::GetHeaders(ids, handle) => {
            let storage = task_data.storage.clone();
            info.timeout_spawn_fallible(
                "GetHeaders",
                Duration::from_secs(PROCESS_TIMEOUT_GET_HEADERS),
                handle_get_headers(storage, ids, handle),
            );
        }
        ClientMsg::PullHeaders(from, to, handle) => {
            let storage = task_data.storage.clone();
            info.timeout_spawn_fallible(
                "PullHeaders",
                Duration::from_secs(PROCESS_TIMEOUT_GET_HEADERS_RANGE),
                handle_get_headers_range(storage, from, to, handle),
            );
        }
        ClientMsg::GetBlocks(ids, handle) => {
            let storage = task_data.storage.clone();
            info.timeout_spawn_fallible(
                "get blocks",
                Duration::from_secs(PROCESS_TIMEOUT_GET_BLOCKS),
                handle_get_blocks(storage, ids, handle),
            );
        }
        ClientMsg::PullBlocks(from, to, handle) => {
            let storage = task_data.storage.clone();
            info.timeout_spawn_fallible(
                "PullBlocks",
                Duration::from_secs(PROCESS_TIMEOUT_PULL_BLOCKS),
                handle_pull_blocks(storage, from, to, handle),
            );
        }
        ClientMsg::PullBlocksToTip(from, handle) => {
            let storage = task_data.storage.clone();
            let blockchain_tip = task_data.blockchain_tip.clone();
            info.timeout_spawn_fallible(
                "PullBlocksToTip",
                Duration::from_secs(PROCESS_TIMEOUT_PULL_BLOCKS_TO_TIP),
                handle_pull_blocks_to_tip(storage, blockchain_tip, from, handle),
            );
        }
    }
}
async fn get_block_tip(blockchain_tip: Tip) -> Header {
    let tip = blockchain_tip.get_ref().await;
    tip.header().clone()
}
fn get_block_from_storage(storage: &Storage, id: HeaderHash) -> Result<Block, Error> {
    match storage.get(id) {
        Ok(Some(block)) => Ok(block),
        Ok(None) => Err(Error::not_found(format!(
            "block {} is not known to this node",
            id
        ))),
        Err(e) => Err(e.into()),
    }
}
async fn fuse_send_items<T, V>(
    items: T,
    reply_handle: ReplyStreamHandle<V>,
) -> Result<(), ReplySendError>
where
    T: IntoIterator<Item = Result<V, Error>>,
{
    let mut sink = reply_handle.start_sending();
    for item in items.into_iter() {
        let err = item.is_err();
        sink.feed(item).await?;
        if err {
            break;
        }
    }
    sink.close().await
}
async fn send_range_from_storage<T, F>(
    storage: Storage,
    from: Vec<HeaderHash>,
    to: HeaderHash,
    f: F,
    handle: ReplyStreamHandle<T>,
) -> Result<(), ReplySendError>
where
    F: FnMut(Block) -> T,
    F: Send + 'static,
    T: Send + 'static,
{
    let closest_ancestor = storage
        .find_closest_ancestor(from, to)
        .map_err(Into::into)
        .and_then(move |maybe_ancestor| {
            maybe_ancestor
                .map(|ancestor| (to, ancestor.distance))
                .ok_or_else(|| Error::not_found("Could not find a known block in `from`"))
        });
    match closest_ancestor {
        Ok((to, depth)) => storage.send_branch_with(to, Some(depth), handle, f).await,
        Err(e) => {
            handle.reply_error(e);
            Ok(())
        }
    }
}
async fn handle_get_blocks(
    storage: Storage,
    ids: Vec<HeaderHash>,
    handle: ReplyStreamHandle<Block>,
) -> Result<(), ReplySendError> {
    fuse_send_items(
        ids.into_iter()
            .map(|id| get_block_from_storage(&storage, id)),
        handle,
    )
    .await
}
async fn handle_get_headers(
    storage: Storage,
    ids: Vec<HeaderHash>,
    handle: ReplyStreamHandle<Header>,
) -> Result<(), ReplySendError> {
    fuse_send_items(
        ids.into_iter()
            .map(|id| get_block_from_storage(&storage, id).map(|block| block.header().clone())),
        handle,
    )
    .await
}
async fn handle_get_headers_range(
    storage: Storage,
    from: Vec<HeaderHash>,
    to: HeaderHash,
    handle: ReplyStreamHandle<Header>,
) -> Result<(), ReplySendError> {
    send_range_from_storage(storage, from, to, |block| block.header().clone(), handle).await
}
async fn handle_pull_blocks(
    storage: Storage,
    from: Vec<HeaderHash>,
    to: HeaderHash,
    handle: ReplyStreamHandle<Block>,
) -> Result<(), ReplySendError> {
    send_range_from_storage(storage, from, to, identity, handle).await
}
async fn handle_pull_blocks_to_tip(
    storage: Storage,
    blockchain_tip: Tip,
    from: Vec<HeaderHash>,
    handle: ReplyStreamHandle<Block>,
) -> Result<(), ReplySendError> {
    let tip = get_block_tip(blockchain_tip).await.id();
    send_range_from_storage(storage, from, tip, identity, handle).await
}