1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use libp2p::PeerId;
use prost::Message;
use crate::router;
use crate::utilities::qaul_id::QaulId;
use crate::utilities::timestamp::Timestamp;
pub struct MessagingRetransmit {}
impl MessagingRetransmit {
pub fn process() {
let unconfirmed = super::UNCONFIRMED.get().write().unwrap();
if unconfirmed.unconfirmed.len() == 0 {
return;
}
let online_users = router::table::RoutingTable::get_online_users();
let mut updated = false;
let cur_time = Timestamp::get_timestamp();
for entry in unconfirmed.unconfirmed.iter() {
if let Ok((signature, mut unconfirmed_message)) = entry {
if cur_time < (unconfirmed_message.last_sent + 3000) {
continue;
}
if unconfirmed_message.scheduled_dtn {
continue;
}
let qaul_id = QaulId::bytes_to_q8id(unconfirmed_message.receiver_id.clone());
if let Some(_hc) = online_users.get(&qaul_id) {
let mut timeout: u64 = 0;
if unconfirmed_message.scheduled {
timeout = 20 * 1000;
}
if cur_time > (timeout + unconfirmed_message.last_sent) {
if let Ok(container) =
super::proto::Container::decode(&unconfirmed_message.container[..])
{
let receiver =
PeerId::from_bytes(&unconfirmed_message.receiver_id).unwrap();
log::error!(
"retrans message, signature: {}",
bs58::encode(container.signature.clone()).into_string()
);
super::Messaging::schedule_message(
receiver.clone(),
container.clone(),
true,
false,
unconfirmed_message.scheduled_dtn,
unconfirmed_message.is_dtn,
);
let mut new_retry = unconfirmed_message.retry;
if new_retry > 10 {
new_retry = 1;
}
unconfirmed_message.retry = new_retry;
unconfirmed_message.last_sent = cur_time;
if let Err(_e) = unconfirmed
.unconfirmed
.insert(signature, unconfirmed_message.clone())
{
log::error!("updating unconfirmed table error!");
} else {
updated = true;
}
}
}
}
}
}
if updated {
if let Err(_e) = unconfirmed.unconfirmed.flush() {
log::error!("updating unconfirmed table error!");
}
}
}
}