1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//use bs58::decode;
use libp2p::PeerId;

use crate::node::user_accounts::UserAccounts;
use crate::rpc::Rpc;
use crate::services::messaging;
use prost::Message;

pub struct RtcMessaging {}
impl RtcMessaging {
    /// process rtc message command from cli
    pub fn send_message(
        my_user_id: &PeerId,
        req: &super::proto_rpc::RtcOutgoing,
    ) -> Result<bool, String> {
        match super::Rtc::get_session_from_id(&req.group_id) {
            Some(session) => {
                if session.state != 3 {
                    return Err("session is not established".to_string());
                }

                // make message and send on the messaging service
                let proto_message = super::proto_net::RtcMessage {
                    sequence: 0,
                    content: req.content.clone(),
                };

                let send_message = messaging::proto::Messaging {
                    message: Some(messaging::proto::messaging::Message::RtcStreamMessage(
                        messaging::proto::RtcStreamMessage {
                            content: proto_message.encode_to_vec(),
                        },
                    )),
                };

                let message_id: Vec<u8> = Vec::new();
                if let Some(user_account) = UserAccounts::get_by_id(*my_user_id) {
                    let receiver = PeerId::from_bytes(&req.group_id).unwrap();
                    if let Err(e) = messaging::Messaging::pack_and_send_message(
                        &user_account,
                        &receiver,
                        send_message.encode_to_vec(),
                        messaging::MessagingServiceType::Unconfirmed,
                        &message_id,
                        false,
                    ) {
                        log::error!("error {}", e);
                    }
                } else {
                    return Err("user account has problem".to_string());
                }
                return Ok(true);
            }
            None => {
                return Err("session does not exist".to_string());
            }
        }
    }

    /// proccess message from network
    #[allow(dead_code)]
    pub fn on_message(
        sender_id: &PeerId,
        _receiver_id: &PeerId,
        req: &super::proto_net::RtcMessage,
        _signature: Vec<u8>,
    ) {
        match super::Rtc::get_session_from_id(&sender_id.to_bytes()) {
            None => {
                log::error!(
                    "on_message session does not exist {}",
                    sender_id.to_string()
                );
            }
            Some(_session) => {
                // make session rpc and response to cli
                let proto_message = super::proto_rpc::RtcRpc {
                    message: Some(super::proto_rpc::rtc_rpc::Message::RtcIncoming(
                        super::proto_rpc::RtcIncoming {
                            group_id: sender_id.to_bytes(),
                            content: req.content.clone(),
                        },
                    )),
                };

                // encode message
                let mut buf = Vec::with_capacity(proto_message.encoded_len());
                proto_message
                    .encode(&mut buf)
                    .expect("Vec<u8> provides capacity as needed");

                // send message
                Rpc::send_message(
                    buf,
                    crate::rpc::proto::Modules::Rtc.into(),
                    "".to_string(),
                    Vec::new(),
                );
            }
        }
    }
}