message_queue.anubis
5.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
read muscle.anubis
read tools/basis.anubis
read system/string.anubis
public type MessageResult:
timeout,
closed,
msg(Message).
public type MessageQueue:
message_queue
(
Message -> One add_received_Message,
Int -> MessageResult get_next_received_Message,
Message -> One add_Message_to_send,
One -> MessageResult get_next_Message_to_send,
One -> Bool has_received_Message,
One -> One connection_closed,
One -> One quit,
One -> Bool quit_requested,
One -> String get_name,
String -> One set_name
).
define MessageResult
get_next_received_Message
(
Int _timeout,
Var(List(Message)) input_flip,
Var(List(Message)) input_flap,
Var(List(Message)) output,
Var(Bool) flip,
Var(Bool) conn_closed
)=
// println("get_next_Message function "+"["+virtual_machine_id + "]");
if *output is
{
[] then
if *flip = true & length(*input_flip) > 0 then
flip <- false; //switch of input list
output <- reverse(*input_flip);
input_flip <- [];
get_next_received_Message(_timeout, input_flip, input_flap, output, flip, conn_closed)
else if *flip = false & length(*input_flap) > 0 then
flip <- true;
output <- reverse(*input_flap);
input_flap <- [];
get_next_received_Message(_timeout, input_flip, input_flap, output, flip, conn_closed)
else
if *conn_closed then
closed
else
if now > _timeout then
timeout
else
sleep(1);
get_next_received_Message(_timeout, input_flip, input_flap, output, flip, conn_closed)
,
[h . t] then
output <- t;
msg(h)
}.
define MessageResult
get_next_Message_to_send
(
Var(List(Message)) send_flip,
Var(List(Message)) send_flap,
Var(List(Message)) send_list,
Var(Bool) send_flip_flag,
Var(Bool) conn_closed
)=
if *conn_closed then
closed
else
if *send_list is
{
[] then
// println("send_list empty");
if *send_flip_flag = true & length(*send_flip) > 0 then
// println("send_flip_flag true and send_flip length " + length(*send_flip));
send_flip_flag <- false; //switch of input list
send_list <- reverse(*send_flip); //copy in main list
send_flip <- []; //emptying the list
get_next_Message_to_send(send_flip, send_flap, send_list, send_flip_flag, conn_closed)
else if *send_flip_flag = false & length(*send_flap) > 0 then
// println("send_flip_flag false and send_flap length " + length(*send_flap));
send_flip_flag <- true;
send_list <- reverse(*send_flap);
send_flap <- [];
get_next_Message_to_send(send_flip, send_flap, send_list, send_flip_flag, conn_closed)
else
// println("send_flip length " + length(*send_flip) + " send_flap length " + length(*send_flap) );
timeout
,
[h . t] then
send_list <- t;
// println("get_next_Message_to_send 1 message ");
msg(h)
}.
public define MessageQueue
create_MessageQueue
(
String g_name
)=
with rcv_flip = var((List(Message))[]),
rcv_flap = var((List(Message))[]),
rcv_list = var((List(Message))[]),
rcv_flip_flag = var((Bool) true),
send_flip = var((List(Message))[]),
send_flap = var((List(Message))[]),
send_list = var((List(Message))[]),
send_flip_flag = var((Bool) true),
conn_closed = var((Bool) false),
quit = var((Bool) false),
name = var((String) g_name),
message_queue
(
//add_Message
(Message msg) |->
if *rcv_flip_flag then
// println("Add message in flip");
rcv_flip <- ([msg . *rcv_flip]);
unique
else
// println("Add message in flap");
rcv_flap <- ([msg . *rcv_flap]);
unique,
//get_next_Message
(Int _timeout) |->
// println("get_next_Message "+"["+virtual_machine_id + "]");
// sleep(1);
// protect
// println("after protect "+"["+virtual_machine_id + "]");
with timeout = _timeout + now,
get_next_received_Message(timeout, rcv_flip, rcv_flap, rcv_list, rcv_flip_flag, conn_closed),
//add_Message_to_send
(Message msg) |->
if *send_flip_flag then
send_flip <- ([msg . *send_flip]);
// println("add_Message_to_send [0x"+zero_pad_n(8,to_hexa(*msg.what))+"] add in flip ["+*name+"]");
unique
else
send_flap <- ([msg . *send_flap]);
// println("add_Message_to_send [0x"+zero_pad_n(8,to_hexa(*msg.what))+"] add in flap ["+*name+"]");
unique,
//get_next_Message_to_send
(One u) |->
// protect
// println("get_next_Message_to_send");
get_next_Message_to_send(send_flip, send_flap, send_list, send_flip_flag, conn_closed),
//has_received_message
(One u) |->
if length(*rcv_list) + length(*rcv_flip) + length(*rcv_flap) = 0 then
false
else
true,
//connection_closed
(One u) |->
conn_closed <- true,
//quit
(One u) |->
// println("MessageQueue ["+name+"] QUIT REQUESTED");
conn_closed <- true;
quit <- true,
//quit_requested
(One u) |->
*quit,
//get_name
(One u) |->
*name,
//set_name
(String new_name) |->
name <- new_name
).