message_queue.anubis 5.59 KB


read muscle.anubis
read tools/basis.anubis
read system/string.anubis

public type MessageResult:
  timeout,
  closed,
  msg(Message).
  
public type MessageQueue:
  message_queue
  (
    Message -> One          add_received_Message,
    Int -> MessageResult    get_next_received_Message,
    Message ->  One         add_Message_to_send,
    One -> MessageResult    get_next_Message_to_send,
    One -> Bool             has_received_Message,
    One -> One              connection_closed,
    One -> One              quit,
    One -> Bool             quit_requested,
    One -> String           get_name,
    String -> One           set_name
  ).
  

define MessageResult 
  get_next_received_Message
  (
    Int                 _timeout,
    Var(List(Message))  input_flip,
    Var(List(Message))  input_flap,
    Var(List(Message))  output,
    Var(Bool)           flip,
    Var(Bool)           conn_closed
  )=
//  println("get_next_Message function "+"["+virtual_machine_id + "]");

  if *output is
  {
    []      then
      if *flip = true & length(*input_flip) > 0 then
        flip <- false;  //switch of input list
        output <- reverse(*input_flip);
        input_flip <- [];
        get_next_received_Message(_timeout, input_flip, input_flap, output, flip, conn_closed)
      else if *flip = false & length(*input_flap) > 0 then
        flip <- true;
        output <- reverse(*input_flap);
        input_flap <- [];
        
        get_next_received_Message(_timeout, input_flip, input_flap, output, flip, conn_closed)
      else
        if *conn_closed then
          closed
        else        
        if now > _timeout   then
          timeout
        else
          sleep(1);
          get_next_received_Message(_timeout, input_flip, input_flap, output, flip, conn_closed)
      ,
    [h . t] then
      output <- t;
      msg(h)
  }.
  
define MessageResult 
  get_next_Message_to_send
  (
    Var(List(Message))  send_flip,
    Var(List(Message))  send_flap,
    Var(List(Message))  send_list,
    Var(Bool)           send_flip_flag,
    Var(Bool)           conn_closed
  )=
  if *conn_closed then
    closed
  else
    if *send_list is
    {
      []      then
  //      println("send_list empty");
        if *send_flip_flag = true & length(*send_flip) > 0 then
  //        println("send_flip_flag true and send_flip length " + length(*send_flip));
          send_flip_flag <- false;  //switch of input list
          send_list <- reverse(*send_flip); //copy in main list
          send_flip <- [];          //emptying the list
            get_next_Message_to_send(send_flip, send_flap, send_list, send_flip_flag, conn_closed)
        else if *send_flip_flag = false & length(*send_flap) > 0 then
  //        println("send_flip_flag false and send_flap length " + length(*send_flap));
          send_flip_flag <- true;
          send_list <- reverse(*send_flap);
          send_flap <- [];
            get_next_Message_to_send(send_flip, send_flap, send_list, send_flip_flag, conn_closed)
        else
  //        println("send_flip length " + length(*send_flip) + " send_flap length " + length(*send_flap) );
          timeout
        ,
      [h . t] then
        send_list <- t;
  //      println("get_next_Message_to_send 1 message ");
        msg(h)
    }.

public define MessageQueue
  create_MessageQueue
  (
    String g_name
  )=
  with rcv_flip       = var((List(Message))[]),
       rcv_flap       = var((List(Message))[]),
       rcv_list       = var((List(Message))[]),
       rcv_flip_flag  = var((Bool) true),
       send_flip      = var((List(Message))[]),
       send_flap      = var((List(Message))[]),
       send_list      = var((List(Message))[]),
       send_flip_flag = var((Bool) true),
       conn_closed    = var((Bool) false),
       quit           = var((Bool) false),
       name           = var((String) g_name),
  message_queue
  (
    //add_Message
    (Message msg) |-> 
      if *rcv_flip_flag then
//        println("Add message in flip");
        rcv_flip <- ([msg . *rcv_flip]);
        unique
      else
//        println("Add message in flap");
        rcv_flap <- ([msg . *rcv_flap]);
        unique,
    
    //get_next_Message
    (Int _timeout) |-> 
//      println("get_next_Message "+"["+virtual_machine_id + "]");
//      sleep(1);
//      protect
//      println("after protect "+"["+virtual_machine_id + "]");
      with timeout = _timeout + now,
        get_next_received_Message(timeout, rcv_flip, rcv_flap, rcv_list, rcv_flip_flag, conn_closed),
          
    //add_Message_to_send
    (Message msg) |->
      if *send_flip_flag then
        send_flip <- ([msg . *send_flip]);
//        println("add_Message_to_send [0x"+zero_pad_n(8,to_hexa(*msg.what))+"] add in flip ["+*name+"]");
        unique
      else
        send_flap <- ([msg . *send_flap]);
//        println("add_Message_to_send [0x"+zero_pad_n(8,to_hexa(*msg.what))+"] add in flap ["+*name+"]");
        unique,
      
    //get_next_Message_to_send
    (One u) |->
//      protect
//      println("get_next_Message_to_send");
      get_next_Message_to_send(send_flip, send_flap, send_list, send_flip_flag, conn_closed),
    
    //has_received_message
    (One u) |->
      if length(*rcv_list) + length(*rcv_flip) + length(*rcv_flap) = 0 then
        false
      else
        true,
        
    //connection_closed    
    (One u) |->
      conn_closed <- true,

    //quit    
    (One u) |->
//      println("MessageQueue ["+name+"] QUIT REQUESTED");
      conn_closed <- true;      
      quit <- true,
      
    //quit_requested
    (One u) |->
      *quit,
    //get_name
    (One u) |->
      *name,
    //set_name
    (String new_name) |->
      name <- new_name
  ).