1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use libp2p::PeerId;
use crate::node::user_accounts::UserAccounts;
use crate::rpc::Rpc;
use crate::services::messaging;
use prost::Message;
pub struct RtcMessaging {}
impl RtcMessaging {
pub fn send_message(
my_user_id: &PeerId,
req: &super::proto_rpc::RtcOutgoing,
) -> Result<bool, String> {
match super::Rtc::get_session_from_id(&req.group_id) {
Some(session) => {
if session.state != 3 {
return Err("session is not established".to_string());
}
let proto_message = super::proto_net::RtcMessage {
sequence: 0,
content: req.content.clone(),
};
let send_message = messaging::proto::Messaging {
message: Some(messaging::proto::messaging::Message::RtcStreamMessage(
messaging::proto::RtcStreamMessage {
content: proto_message.encode_to_vec(),
},
)),
};
let message_id: Vec<u8> = Vec::new();
if let Some(user_account) = UserAccounts::get_by_id(*my_user_id) {
let receiver = PeerId::from_bytes(&req.group_id).unwrap();
if let Err(e) = messaging::Messaging::pack_and_send_message(
&user_account,
&receiver,
send_message.encode_to_vec(),
messaging::MessagingServiceType::Unconfirmed,
&message_id,
false,
) {
log::error!("error {}", e);
}
} else {
return Err("user account has problem".to_string());
}
return Ok(true);
}
None => {
return Err("session does not exist".to_string());
}
}
}
#[allow(dead_code)]
pub fn on_message(
sender_id: &PeerId,
_receiver_id: &PeerId,
req: &super::proto_net::RtcMessage,
_signature: Vec<u8>,
) {
match super::Rtc::get_session_from_id(&sender_id.to_bytes()) {
None => {
log::error!(
"on_message session does not exist {}",
sender_id.to_string()
);
}
Some(_session) => {
let proto_message = super::proto_rpc::RtcRpc {
message: Some(super::proto_rpc::rtc_rpc::Message::RtcIncoming(
super::proto_rpc::RtcIncoming {
group_id: sender_id.to_bytes(),
content: req.content.clone(),
},
)),
};
let mut buf = Vec::with_capacity(proto_message.encoded_len());
proto_message
.encode(&mut buf)
.expect("Vec<u8> provides capacity as needed");
Rpc::send_message(
buf,
crate::rpc::proto::Modules::Rtc.into(),
"".to_string(),
Vec::new(),
);
}
}
}
}