message_transceiver.anubis 5.42 KB

read network/tools.anubis
read system/data_io.anubis
read system/muscle.anubis
read system/message_queue.anubis
read system/string.anubis
read tools/basis.anubis

/********************** TESTING SERVER *************************/

type ConnStatus:
  closed,
  ok.
  
type DataResult:
  failure,
  closed,
  ok(Message).
  
define DataResult
  receive_data
  (
    RWStream  conn,
    Int32     len_left,
    ByteArray current
  )=
  if len_left = 0 then
    if unflatten_message(make_data_io(current)) is
    {
      failure       then 
        print("can't unflatten any message\n");
        failure,
      success(msg)  then 
        //println("unflatten ok");
        ok(msg)
    }
  else
    if read(weaken(conn), len_left, 1) is      //by default the time_out is 1 second
    {
      error     then closed,
      timeout   then 
        //TODO manage the timeout
        receive_data(conn, len_left, current),
      ok(bytes) then 
        with len_fragment = length(bytes),
             new_buffer = current + bytes,
//             println("fragment "+len_fragment + " total : "+length(new_buffer));
             receive_data(conn, len_left - len_fragment, new_buffer)
    }
  .
define Bool
  is_header
  (
    ByteArray current_buffer
  )=
  with signature = lendian_to_host_Int32(extract(current_buffer, 4, 8)),
    if signature = _MUSCLE_MESSAGE_ENCODING_DEFAULT then
      true
    else
      false
  .
  
define One 
  message_receiver
  (
    RWStream      conn,
    MessageQueue  queue,
    ByteArray     current_buffer
  ) =
  if queue.quit_requested(unique) then
    unique
  else
  with buffer_len = length(current_buffer),
  if buffer_len = 8 then
//    println("buffer len 8 bytes [" + to_hexa(current_buffer));
    if is_header(current_buffer) then
      with message_len = lendian_to_host_Int32(extract(current_buffer, 0, 4)),
//      println("Message length "+message_len);
      if receive_data(conn, message_len, constant_byte_array(0,0)) is
      {
        failure then 
          message_receiver(conn, queue, constant_byte_array(0,0)),
          closed  then unique,
        ok(msg) then
          queue.add_received_Message(msg);
          message_receiver(conn, queue, constant_byte_array(0,0)),
      }
    else
      message_receiver(conn, queue, extract(current_buffer, 1, 8))
  else
    if read(weaken(conn), 8 - buffer_len, 1) is      //by default the time_out is 1 second
    {
      error     then 
//        println("["+queue.get_name(unique) + "] socket closed");
        queue.connection_closed(unique),
      timeout   then 
        //println("receiver timeout");
        //TODO manage the timeout
        message_receiver(conn, queue, current_buffer),
      ok(bytes) then 
//        println("receiver bytes " + to_hexa(bytes));
        with new_buffer = current_buffer + bytes,
        message_receiver(conn, queue, new_buffer)
    }
  .
  
define One   
  message_receiver_starter
  (
    RWStream      conn,
    MessageQueue  queue
  ) =
//  println("** message receiver ["+virtual_machine_id+"|"+queue.get_name(unique)+"] started");
  message_receiver(conn, queue, constant_byte_array(0,0)).
//  println("// message receiver ["+virtual_machine_id+"|"+queue.get_name(unique)+"] ended").

define One
  message_sender
  (
    RWStream      conn,
    MessageQueue  queue
  )=
  if queue.get_next_Message_to_send(unique) is
  {
    timeout   then // println("sender timeout");
      sleep(1); 
      message_sender(conn, queue),    
    closed    then  // println("message_sender ["+queue.get_name(unique)+"] socket closed\n");
      unique
    msg(msg)  then  
      if send_message_by_Stream(conn, msg) is
      {
          flatten_error then println("Send message flatten_error"),
          network_error then println("Send message network_error"),
          ok            then unique
      };
      message_sender(conn, queue)
  }.
  
define One
  message_sender_starter
  (
    RWStream      conn,
    MessageQueue  queue
  )=
  //println("++ message sender ["+virtual_machine_id+"|"+queue.get_name(unique)+"] started");
  message_sender(conn, queue).
  //println("-- message sender ["+virtual_machine_id+"|"+queue.get_name(unique)+"] ended").
  
public define One
  message_transceiver
  (
    RWStream      conn,
    MessageQueue  queue
  )=
  //now managing the AUTHORIZATION state
  delegate message_sender_starter(conn, queue),
  delegate message_receiver_starter(conn, queue),
  unique.
  
  
 define Server -> (RWStream) -> One
  muscle_handler
  (
    MessageQueue  queue
  ) = 
  (Server server) |-> (RWStream conn) |->
  if remote_IP_address_and_port(conn) is (num_peer,_) then 
  //convert IP address of the client to string
  with peer = ip_addr_to_string(num_peer), 
  print("Accepting connection with "+peer+"\n"); 
  message_transceiver(conn, queue)
  .

 public define One
  start_muscle_server
  (
    MessageQueue  queue
  ) =
   if start_server(0,
                   44701, 
                   muscle_handler(queue),
                   (One u) |-> unique) is 
     {
       cannot_create_the_socket then print("Cannot create the listening socket.\n"), 
       cannot_bind_to_port      then print("Cannot bind to port 44701\n"), 
       cannot_listen_on_port    then print("Cannot listen on port 44701\n"),
       ok(server)               then print("Muscle Server started on port 44701 \n")
     }.

 global define One
  muscle
  (
    List(String)  args
  ) =
  with queue = create_MessageQueue("Muscle Message Test"),
  start_muscle_server(queue);
  print("Muscle Message Test\n")
  .