1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use crate::server::socket_parser::SocketParser;

use bytes::{BufMut, BytesMut};
use futures::stream::Stream;
use std::time::Instant;
use tokio::io;
use tokio::net::TcpStream;
use tokio::prelude::*;

pub struct BaseSocket {
    socket: TcpStream,
    rd: BytesMut,
    wr: BytesMut,
    parser: SocketParser,
    last_received: Instant,
}
impl BaseSocket {
    pub fn new(socket: TcpStream) -> Self {
        Self {
            socket,
            rd: BytesMut::new(),
            wr: BytesMut::new(),
            parser: SocketParser::new(),
            last_received: Instant::now(),
        }
    }
    pub fn buffer(&mut self, line: &[u8]) {
        self.wr.reserve(line.len());
        self.wr.put(line)
    }

    pub fn poll_flush(&mut self) -> Poll<(), io::Error> {
        while !self.wr.is_empty() {
            let n = try_ready!(self.socket.poll_write(&self.wr));
            assert!(n > 0);
            let _ = self.wr.split_to(n);
        }
        Ok(Async::Ready(()))
    }

    pub fn fill_read_buf(&mut self) -> Poll<(), io::Error> {
        loop {
            self.rd.reserve(1024 * 1024);
            let n = try_ready!(self.socket.read_buf(&mut self.rd));
            if n == 0 || self.rd.len() > 0 {
                return Ok(Async::Ready(()));
            }
        }
    }

    pub fn get_socket(&self) -> &TcpStream {
        &self.socket
    }

    pub fn get_parser_mut(&mut self) -> &mut SocketParser {
        &mut self.parser
    }
}

impl Stream for BaseSocket {
    type Item = (Vec<(BytesMut, u32)>);
    type Error = std::io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        let sock_closed = self.fill_read_buf()?.is_ready();
        if self.rd.len() > 0 {
            self.last_received = Instant::now();
            let (parse_result, parsed) = self.parser.parse(&mut self.rd.to_vec()).unwrap();
            match parse_result {
                Some(msg) => {
                    self.rd.split_to(parsed);
                    let mut ret = Vec::with_capacity(msg.len());
                    for (buf, route) in msg {
                        ret.push((BytesMut::from(buf), route));
                    }
                    return Ok(Async::Ready(Some(ret)));
                }
                None => {
                    self.rd.split_to(parsed);
                }
            }
        }
        if sock_closed {
            if Instant::now().duration_since(self.last_received).as_secs() > 60 {
                println!("Socket disconnecting: time since last message > 60 secs");
                Ok(Async::Ready(None))
            } else {
                Ok(Async::NotReady)
            }
        } else {
            Ok(Async::NotReady)
        }
    }
}