4) Utilizzo dei messaggi |
Vedremo ora come costruire, su uno stream
quale quello offerto da una connessione TCP, una comunicazione
basata su un flusso di messaggi anziché, come fatto finora,
su un flusso di byte.
Un messaggio è
una quantità arbitraria di informazioni (in genere tali
da avere una propria autosufficienza logica e funzionale) che
vengono corredate da informazioni di
controllo (CI)
e quindi inviate come un'unità a se stante che prende il
nome di pacchetto.
Si noti come, per una comunicazione di questo tipo, non sia strettamente
necessaria una connessione basata su TCP.
Anche UDP sarebbe adeguato, se non fosse per il fatto che non
è un protocollo affidabile. Di conseguenza, noi ricaveremo
il supporto alla gestione di messaggi a partire da stream affidabili
quali quelli offerti da TCP.
I principali benefici derivanti dall'uso di messaggi sono i seguenti:
Tipicamente i messaggi possono essere usati per:
4.1) Classi di base per la gestione di messaggi |
4.1.1) Classe MessageOutput |
E' la superclasse astratta da cui derivano
tutti gli stream per la gestione di messaggi (che indicheremo
col termine generico di message stream)
di output.
E' un FilterOutputStream
(e quindi si deve attaccare
a un OutputStream
) e, per convenienza, estende DataOutputStream
.
Ai metodi di quest'ultima classe aggiunge tre metodi send()
.
Su un message stream di output si opera in questo modo:
DataOutputStream
si
scrive un messaggio, finché esso non è completato
(tipicamente usando un buffer interno);
send()
lo stream effettua
le seguenti operazioni:
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.msg; import java.io.*; /** * The superclass for all message output streams. * <p>Extends <tt>DataOutputStream</tt> and adds <tt>send()</tt> * methods that send the current message to the attached channel. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes * @see prominence.msg.MessageInput */ public abstract class MessageOutput extends DataOutputStream { /** * Creates a new <tt>MessageOutput</tt>. * @param out Stream to which to write message data. */ public MessageOutput (OutputStream out) { super (out); } /** * Sends the current message to the attached channel. * <p>Subclasses will extend this class and implement this * method as appropriate for a particular communications * channel. * @exception IOException Occurs if there is a problem sending * a message. */ public abstract void send () throws IOException; /** * Sends the current message to the attached channel with * a routing header that indicates a list of recipients. * <p>Subclasses that support this method will override it * with an appropriate implementation. The default implementation * is to throw an exception. * @param dst The list of intended recipients * @exception IOException Occurs if there is a problem sending * a message or this method is not supported. */ public void send (String[] dst) throws IOException { throw new IOException ("send[] not supported"); } /** * Sends the current message to the attached channel with * a routing header that indicates a single recipient. * <p>The default implementation of this method calls the * previous method with a single-element array. * @param dst The intended recipient * @exception IOException Occurs if there is a problem sending * a message or targeted sending is not supported. */ public void send (String dst) throws IOException { String[] dsts = { dst }; send (dsts); } } |
Note
DataOutputStream
, ereditando quindi
i suoi metodi.
OutputStream
a cui
attaccarsi e lo passa al costruttore della superclasse. I vari
metodi di scrittura della superclasse scriveranno direttamente
su tale stream. Esso, nelle classi derivate, tipicamente sarà
un ByteArrayOutputStream
, ossia uno stream che scrive
in un buffer (costituito da un array di byte) in memoria.
send()
trasmette materialmente il messaggio (dopo
averlo incapsulato) sul canale di comunicazione, cioé scrive
l'intero pacchetto sull'OutputStream
a cui è
attaccato.
send(
)
,
che noi non useremo, predispongono alla trasmissione multicast.
4.1.2) Classe MessageInput |
E' la superclasse astratta da cui derivano
tutti i message stream di input.
E' un FilterInputStream
(e quindi si deve attaccare
a un InputStream
) e, per convenienza, estende DataInputStream
.
Ai metodi di quest'ultima classe aggiunge un metodo receive()
.
Con un message stream di input opera in questo modo:
receive()
ci si blocca in attesa che
un pacchetto sia ricevuto dal canale di comunicazione. Quando
ciò accade, il messaggio viene estratto dal pacchetto e
diviene disponibile;
DataInputStream
si leggono i dati del messaggio;
receive()
si attende un
nuovo pacchetto, il cui contenuto sovrascrive il messaggio precedente
(anche se quest'ultimo non è stato completamente letto).
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.msg; import java.io.*; /** * The superclass for all message input streams. * <p>Extends <tt>DataInputStream</tt> and adds a <tt>receive()</tt> * method that receives a new message from the attached channel. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes * @see prominence.msg.MessageOutput */ public abstract class MessageInput extends DataInputStream { /** * Creates a new <tt>MessageInput</tt>. * @param in Stream from which to read message data */ public MessageInput (InputStream in) { super (in); } /** * Receives a new message from the attached channel and makes * it available to read through the standard <tt>DataInputStream</tt> * methods. * <p>Subclasses will extend this class and implement this method * as appropriate for a particular communications channel. * @exception IOException Occurs if there is a problem receiving * a new message. */ public abstract void receive () throws IOException; } |
Note
DataInputStream
, ereditando i suoi
metodi.
InputStream
a cui attaccarsi
e lo passa al costruttore della superclasse. I vari metodi di
lettura della superclasse leggeranno direttamente da tale stream.
Esso, nelle classi derivate, sarà tipicamente uno stream
che legge da un buffer in memoria.
receive()
si blocca finché non arriva
un pacchetto, estrae il messaggio e lo rende disponibile nel buffer
di cui sopra.
4.1.3) Classe MessageOutputStream |
Questa è la prima implementazione di
MessageOutput
che vedremo, e si attacca a un OutputStream
.
Questa classe, in particolare, incapsula i messaggi in pacchetti
la cui Control Information è costituita dalla lunghezza
in byte del messaggio. Ciò consente al destinatario di
sapere in anticipo quanto è grande il messaggio, senza
doverne analizzare alcuna parte.
MessageOutputStream
Internamente, il meccanismo di buffering si ottiene per mezzo
di un ByteArrayOutputStream
.
MessageOutputStream
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.msg; import java.io.*; /** * A <tt>MessageOutput</tt> that writes messages to an attached * <tt>OutputStream</tt>. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes * @see prominence.msg.MessageInputStream */ public class MessageOutputStream extends MessageOutput { /** * The attached <tt>OutputStream</tt>. */ protected OutputStream o; /** * A <tt>DataOutputStream</tt> attached to <tt>o</tt>. */ protected DataOutputStream dataO; /** * A <tt>ByteArrayOutputStream</tt> used to buffer the current message. */ protected ByteArrayOutputStream byteO; /** * Creates a new <tt>MessageOutputStream</tt>. Message data is * buffered internally until <tt>send()</tt> is called. * @param o The <tt>OutputStream</tt> to which to write messages */ public MessageOutputStream (OutputStream o) { super (new ByteArrayOutputStream ()); byteO = (ByteArrayOutputStream) out; this.o = o; dataO = new DataOutputStream (o); } /** * Sends a message to the attached stream. The message body length * is written as an <tt>int</tt>, followed by the message body itself; * the internal message buffer is then reset. * <p>This method synchronizes on the attached stream. * @exception IOException Occurs if there is a problem writing to * the attached stream. */ public void send () throws IOException { synchronized (o) { dataO.writeInt (byteO.size ()); byteO.writeTo (o); } byteO.reset (); o.flush (); } } |
Note
MessageOutput
e quindi DataOutputStream
;
DataOutputStream
scrivono nel buffer
interno;
send()
invia un pacchetto, formato dalla lunghezza
dati più il contenuto del buffer, sullo stream di output
a cui il MessageOutputStream
è attaccato.
Costruttore
MessageOutput
, il quale a sua
volta chiama quello di DataOutputStream
. Però,
al costruttore della superclasse
non viene passato
il vero OutputStream
a cui ci si deve attaccare,
ma un nuovo ByteArrayOutputStream
, creato per l'occasione,
che costituirà il buffer interno. Di conseguenza, la superclasse
sarà attaccata a tale buffer, e i suoi metodi di scrittura
opereranno su di esso.
FilterOutputStream
tiene in una
variabile out
una reference all'OutputStream
a cui è attaccato (cioè quello che gli viene passato
nel costruttore). Usiamo tale variabile per ricavare una reference
al buffer interno, byteO
, che serve per usare i metodi
di ByteArrayOutputStream
.
OutputStream
,
cioè ad o
, un DataOutputStream
(per poterne usare i metodi) e utilizziamo dataO
come reference ad esso.
close()
il MessageOutputStream,
che non è attaccato al vero OutputStream
,
non chiude quest'ultimo ma il buffer interno byteO
,
il che non ha alcun effetto.
Metodi
DataOutputStream
scriveranno
dunque dentro byteO
, il buffer interno.
send()
, invece, compie le seguenti
azioni (bloccandosi se necessario):
OutputStream
, garantendo
così una corretta gestione del canale di comunicazione
nel caso vi fossero attacati molteplici MessageOutputStream
;
OutputStream
un intero pari alla
dimensione corrente del buffer;
OutputStream
;
flush()
per inviare subito il pacchetto
(lunghezza più dati).
4.1.4) Classe MessageInputStream |
Questa classe, che estende MessageInput
,
si attacca a un InputStream
e va usata per leggere
i dati che provengono da un MessageOutputStream
.
Essa, col metodo receive()
, estrae il messaggio dal
pacchetto (eliminando la CI, costituita dalla dimensione del messaggio)
e quindi rende disponibile il contenuto del messaggio ai normali
metodi di lettura (quelli di DataInputStream
), memorizzandolo
in un buffer interno.
MessageInputStream
Internamente, si usa un ByteArrayInputStream per implementare
il meccanismo di buffering.
MessageInputStream
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.msg; import java.io.*; /** * A <tt>MessageInput</tt> that reads messages from an * attached <tt>InputStream</tt>. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes * @see prominence.msg.MessageOutputStream */ public class MessageInputStream extends MessageInput { /** * The attached <tt>InputStream</tt>. */ protected InputStream i; /** * A <tt>DataInputStream</tt> attached to <tt>i</tt>. */ protected DataInputStream dataI; /** * Creates a new <tt>MessageInputStream</tt>. * @param i The <tt>InputStream</tt> from which to read messages. */ public MessageInputStream (InputStream i) { super (null); this.i = i; dataI = new DataInputStream (i); } /** * A buffer containing the most recently received message. * <p>This variable is exposed to permit potential optimizations. */ byte[] buffer; /** * Receives a message from the attached stream. An <tt>int</tt> * header is read, which indicates the length of the message body. * The message body is then read and made available through the * usual superclass <tt>read()</tt> methods. * <p>This method synchronizes on the attached stream. * @exception IOException Occurs if there is a problem reading * from the attached stream. */ public void receive () throws IOException { synchronized (i) { int n = dataI.readInt (); buffer = new byte[n]; dataI.readFully (buffer); } in = new ByteArrayInputStream (buffer); } } |
Note
MessageInput
e quindi DataInputStream
;
DataInputStream
leggeranno dal buffer
interno (se è stata fatta almeno una receive()
);
receive()
crea un buffer, lo riempie col messaggio
e lo attacca al MessageInputStream
.
Costruttore
MessageInput
, e quindi quello
di DataInputStream
, che però (come prima)
non vengono attaccati al vero InputStream
. In questo
caso anzi vengono attaccati a un null
, perché
non ha senso fare normali letture prima che sia arrivato un pacchetto:
in tal caso si genera un'eccezione.
dataI
un nuovo DatainputStream
attaccato al canale di comunicazione, per poterne usare i metodi.
Metodi
DataInputStream
leggeranno
da in
e quindi dal buffer interno.
receive()
compie le seguenti operazioni
(bloccandosi finché non arriva un messaggio):
ByteArrayInputStream
, attaccato
all'array appena riempito;
MessageInputStream
al ByteArrayInputStream
appena creato, assegnando quest'ultimo a in
(la variabile
di FilterInputStream
nella quale si memorizza l'InputStream
a cui il FilterInputStream
è attaccato).
4.2) Un'applicazione client-server per la gestione di transazioni |
Si deve notare che:
MessageOutputStream
possono essere attaccati
a un singolo OutputStream
MessageInputStream
possono essere attaccati
a un singolo InputStream
.
I message stream opereranno comunque correttamente, anche se pilotati
da corrispondenti thread in concorrenza, grazie all'incapsulamento
e alla sincronizzazione.
Ciò è utile, ad esempio, in un'applicazione per la gestione di transazioni, che si svolgono ciascuna secondo il seguente schema:
Pensiamo a un server multithreaded, in cui ogni thread gestisce
sequenzialmente una transazione dopo l'altra.
I vari thread operano in concorrenza, per cui mentre uno elabora
una transazione, un altro ne accetta una nuova, e così
via.
Naturalmente, molte richieste viaggiano assieme in una direzione,
e molte risposte viaggiano assieme nell'altra direzione.
Grazie alla struttura di messaggi, queste trasmissioni non si
disturbano a vicenda.
Il server:
HashTable
che mappa attributi in
valori;
Il client:
get()
) o modificare (con
put()
) il valore di un attributo (queste sono le
uniche due possibili transazioni).
4.2.1) Classe TransactionClient |
E' il cliente che si connette al corrispondente
server.
Usa MessageOutputStream
e MessageInputStream
per comunicare. Presenta all'utente due campi testo e due bottoni:
La definizione della classe è la seguente; si noti che
rispetto alla versione del libro ("Java Network Programming"
di Merlin Hughes et al., Mannig Publications Co, Greenwich CT,
1997) c'è una variazione sostanziale, in quanto la ricezione
dei dati del server avviene in un thread separato, altrimenti
sia il client che il server si bloccano su receive()
.
import java.awt.*; import java.io.*; import java.net.*; import prominence.msg.MessageInputStream; import prominence.msg.MessageOutputStream; public class TransactionClient extends Frame implements Runnable { protected MessageInputStream mI; protected MessageOutputStream mO; protected Button get, put; protected TextField attr, value; public TransactionClient (InputStream i, OutputStream o) { super ("Transaction Client"); mI = new MessageInputStream (i); mO = new MessageOutputStream (o); attr = new TextField (24); value = new TextField (24); get = new Button ("get"); put = new Button ("put"); setLayout (new GridLayout (2, 2)); add (attr); add (value); add (get); add (put); pack (); show (); } public void run () { while (true) { try { mI.receive (); System.out.print ("attr: " + mI.readUTF ()); System.out.println (" value: " + mI.readUTF ()); } catch (IOException e) { e.printStackTrace(); } } } public boolean handleEvent (Event e) { if ((e.id == e.ACTION_EVENT) && (e.target instanceof Button)) { try { if (e.target == get) { mO.writeUTF ("get"); mO.writeUTF (attr.getText ()); } else if (e.target == put) { mO.writeUTF ("put"); mO.writeUTF (attr.getText ()); mO.writeUTF (value.getText ()); } mO.send (); } catch (IOException ex) { ex.printStackTrace (); } } return super.handleEvent (e); } static public void main (String args[]) throws IOException { if (args.length != 2) throw new RuntimeException ("Syntax: TransactionClient <server> <port>"); Socket s = new Socket (args[0], Integer.parseInt (args[1])); InputStream i = s.getInputStream (); OutputStream o = s.getOutputStream (); TransactionClient c = new TransactionClient (i, o); //c.listen (); new Thread(c).start(); } } |
Note
main()
crea il socket per la connessione al
server, estrae i due stream, e crea il TransactionClient
.
Quindi avvia un thread separato per la ricezione dei dati.
run()
, e dunque il thread separato,
è un ciclo infinito che:
receive()
;
handleEvent()
gestisce la interazione
coll'utente, dando via via inizio alle transazioni:
4.2.2) Classe TransactionServer |
Questo è il server che gestisce le
transazioni. Anche lui usa gli stessi tipi di message stream del
client, e grazie ad essi può attivare molteplici thread
che lavorano in concorrenza per gestire molte transazioni contemporaneamente.
Un ritardo simula artificialmente il tempo necessario a gestire
una transazione.
Implementa l'interfaccia Runnable
per poter lanciare
molti thread sullo stesso oggetto.
Il costruttore riceve i due stream per la comunicazione e crea
una HashTable
(essenzialmente una classe che implementa
una memoria associativa, contenente coppie chiave-valore e dotata
di metodi per l'inserimento e la ricerca di elementi) per memorizzare
e ricercare le coppie attributo-valore.
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ import java.io.*; import java.util.*; import java.net.*; import prominence.msg.MessageOutput; import prominence.msg.MessageInputStream; import prominence.msg.MessageOutputStream; public class TransactionServer implements Runnable { protected Hashtable h; protected InputStream i; protected OutputStream o; public TransactionServer (InputStream i, OutputStream o) { this.i = i; this.o = o; h = new Hashtable (); } public void run () { MessageInputStream mI = new MessageInputStream (i); MessageOutputStream mO = new MessageOutputStream (o); try { while (true) { mI.receive (); try { Thread.sleep (1000); } catch (InterruptedException ex) { } String cmd = mI.readUTF (); System.out.println (Thread.currentThread () + ": command " + cmd); if (cmd.equals ("get")) { get (mI, mO); } else if (cmd.equals ("put")) { put (mI); } } } catch (IOException ex) { ex.printStackTrace (); } } void get (DataInputStream dI, MessageOutput mO) throws IOException { String attr = dI.readUTF (); mO.writeUTF (attr); if (h.containsKey (attr)) mO.writeUTF ((String) h.get (attr)); else mO.writeUTF ("null"); mO.send (); } void put (DataInputStream dI) throws IOException { String attr = dI.readUTF (); String value = dI.readUTF (); h.put (attr, value); } static public void main (String args[]) throws IOException { if (args.length != 2) throw new RuntimeException ("Syntax: TransactionServer <port> <threads>"); ServerSocket server = new ServerSocket (Integer.parseInt (args[0])); Socket s = server.accept (); server.close (); InputStream i = s.getInputStream (); OutputStream o = s.getOutputStream (); TransactionServer t = new TransactionServer (i, o); int n = Integer.parseInt (args[1]); for (int j = 0; j < n; ++ j) new Thread (t).start (); } } |
Note
main()
opera come segue:
ServerSocket
;
ServerSocket
;
TransactionServer
;
run()
di ogni thread è un ciclo
infinito, in cui:
receive()
;
get()
o put()
):
put()
di HashTable
.
Lo scopo fondamentale di questa applicazione client-server è
mostrare come l'uso dei message stream ci consenta facilmente
di condividere un unico canale di comunicazione fra molti thread
concorrenti. L'incapsulamento ci protegge dal mescolamento dei
dati.
4.3) Accodamento di messaggi |
Avendo la possibilità di gestire stream di messaggi anziché di byte, è possibile crearsi ulteriori strumenti di utilità:
Ciò significa avere la possibilità di disaccoppiare fra loro:
Le principali conseguenze positive di questo disaccoppiamento sono:
4.3.1) Classe Queue |
E' una implementazione della coda
(intesa come struttura dati), i cui elementi sono messaggi.
Si noti che nella coda ci sono messaggi e non pacchetti, perché
la coda provvede automaticamente a tenere separati tali elementi
e quindi non c'è bisogno dell'informazione di controllo.
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.util; import java.util.Vector; /** * A FIFO (first in, first out) data-structure; the opposite of a <tt>Stack</tt>. * Objects are added to the front of the <tt>Queue</tt> and removed from the back. * <p>This implementation blocks the caller who attempts to remove an object from * an empty queue until the queue is non-empty again. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes */ public class Queue { /** * A <tt>Vector</tt> of the queue elements. */ protected Vector queue; /** * Creates a new, empty <tt>Queue</tt>. */ public Queue () { queue = new Vector (); } /** * Attempts to remove an object from the queue; blocks if there are no objects * in the queue. This call will therefore always return an object. * @returns The least-recently-added object from the queue */ public Object remove () { synchronized (queue) { while (queue.isEmpty ()) { try { queue.wait (); } catch (InterruptedException ex) {} } Object item = queue.firstElement (); queue.removeElement (item); return item; } } /** * Adds an item to the front of the queue, wakes a caller who is waiting for * the queue to become non-empty. * @param item The object to be added */ public void add (Object item) { synchronized (queue) { queue.addElement (item); queue.notify (); } } /** * Returns whether the queue is empty. * @returns Whether the queue is empty */ public boolean isEmpty () { return queue.isEmpty (); } } |
Note
Vector
,
classe che offre due metodi, add()
e remove()
,
per inserire e recuperare un elemento. Va notato che un thread
che cerca di estrarre un elemento da una coda vuota viene bloccato,
e si risveglierà quando ci sarà qualcosa nella coda.
Vector
vuoto, nel quale
verrà mantenuta la coda.
remove()
elimina un elemento dalla
coda e lo restituisce al chiamante:
Vector
,
al fine di poter gestire la coda per mezzo di thread multipli;
wait()
, rilasciando quindi il lucchetto,
se la coda è vuota.
add()
aggiunge un elemento alla coda:
Vector
;
notify()
uno degli
eventuali thread in attesa di un elemento.
4.3.2) Classe QueueOutputStream |
Questa classe è un MessageOutput
che, invece di scrivere messaggi su un OutputStream
,
li inserisce in una coda.
QueueOutputStream
send()
aggiunge alla coda un messaggio, costituito
dal corrente contenuto del buffer.
Come si noterà, in questo caso non si incapsula il messaggio
in un pacchetto, in quanto la coda è una struttura costituita
di elementi separati, ciascuno dei quali è un array che
internamente ha l'informazione sulle proprie dimensioni. Il contenuto
di ogni array coincide con quello del corrispondente messaggio.
Internamente, il buffer si implementa con un ByteArrayOutputStream.
QueueOutputStream
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.msg; import java.io.*; import prominence.util.Queue; /** * A <tt>MessageOutput</tt> that inserts messages into a <tt>Queue</tt> * of byte arrays. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes * @see prominence.msg.QueueInputStream */ public class QueueOutputStream extends MessageOutput { /** * A <tt>ByteArrayOutputStream</tt> used to buffer the current message * contents. */ protected ByteArrayOutputStream byteO; /** * The <tt>Queue</tt> of messages. */ protected Queue q; /** * Creates a new <tt>QueueOutputStream</tt>. * @param q A <tt>Queue</tt> into which messages will be written */ public QueueOutputStream (Queue q) { super (new ByteArrayOutputStream ()); byteO = (ByteArrayOutputStream) out; this.q = q; } /** * Inserts the current message buffer into the <tt>Queue</tt>. */ public void send () { byte[] buffer = byteO.toByteArray (); byteO.reset (); q.add (buffer); } } |
Note
MessageOutputStream
).
Inoltre, mantiene una reference alla coda cui è attaccato.
send()
estrae il contenuto attuale
del buffer (ossia il messaggio costruito con le scritture) e lo
inserisce come nuovo elemento nella coda. Quindi resetta il buffer
in modo che le prossime scritture possano costruire il prossimo
messaggio.
QueueOutputStream
possono essere
attaccati alla stessa coda. Grazie al fatto che l'aggiunta di
elementi avviene in un blocco sincronizzato, non si verificano
interferenze fra eventuali thread concorrenti, che tipicamente
gestiscono i vari QueueOutputStream
.
4.3.3) Classe QueueInputStream |
Questa classe è un MessageInput
che, invece di leggere messaggi da un InputStream
,
li estrae da una coda.
QueueInputStream
Receive()
estrae un messaggio dalla coda e rende
disponibile il suo contenuto per le letture. Se non ci sono messaggi,
blocca il chiamante finché non ce n'è uno disponibile.
Internamente il buffer si implementa con un ByteArrayInputStream
.
QueueInputStream
La definizione della classe è la seguente (dal libro "Java
Network Programming" di Merlin Hughes et al., Mannig Publications
Co, Greenwich CT, 1997).
/* Copyright (c) 1996, 1997 Prominence Dot Com, Inc. * * See the file legal.txt in the txt directory for details. */ package prominence.msg; import java.io.*; import prominence.util.Queue; /** * A <tt>MessageInput</tt> that reads messages from a <tt>Queue</tt> of * byte arrays. * * @version 1.0 1 Nov 1996 * @author Merlin Hughes * @see prominence.msg.QueueOutputStream */ public class QueueInputStream extends MessageInput { /** * The <tt>Queue</tt> of messages. */ protected Queue q; /** * Creates a new <tt>QueueInputStream</tt>. * @param q A <tt>Queue</tt> out of which messages will be read */ public QueueInputStream (Queue q) { super (null); this.q = q; } /** * A buffer containing the most recently received message. * <p>This variable is exposed to permit potential optimizations. */ byte[] buffer; /** * Extracts a message from the attached <tt>Queue</tt> and makes * it available to read through the usual superclass <tt>read()</tt> * methods. */ public void receive () { buffer = (byte[]) q.remove (); in = new ByteArrayInputStream (buffer); } } |
Note
null
(non c'è alcun messaggio da leggere) e mantiene una reference
alla coda.
receive()
estrae un messaggio dalla
coda (bloccandosi se non ce ne sono) e lo copia in un array di
byte interno. Quindi, analogamente a MessageInputStream
,
crea un ByteArrayInputStream
attaccato al buffer
e si aggancia a tale ByteArrayInputStream
per le
successive letture.
QueueInputStream
possono essere attaccati alla stessa coda senza problemi di interferenze.
4.3.4) Utilizzo tipico degli stream per l'accodamento di messaggi |
Come abbiamo detto, l'accodamento dei messaggi permette di:
Entrambi questi obiettivi possono essere raggiunti con sistemi
analoghi a quelli sotto esposti.
Uso di code in input
Si dedica un thread separato, che chiameremo thread
copiatore, alla lettura dei pacchetti dalla connessione
di rete. Tale thread non fa altro che ricevere pacchetti dalla
rete (attraverso un MessageInput
) e ricopiarli in
una coda (attraverso un QueueOutputStream
).
Una applicazione che legga i dati da tale coda (attaccandovi un
QueueInputStream
) è quindi isolata dalla rete.
Il codice di tale thread copiatore sarà semplicemente un
ciclo infinito di copiatura:
... try{ while (true) { mi.receive(); byte[] buffer=new byte[mi.available()]; mi.readFully(buffer); mo.write(buffer); mo.send(); } } catch (IOException e) { e.printStackTrace(); } ...
dove:
mi
è il MessageInput
;
mo
è il QueueOutputStream
.
Uso di code in output
Si dedica un thread copiatore alla scrittura dei pacchetti sulla
connessione di rete. Tale thread preleva i pacchetti da una coda
e li ricopia su una connessione di rete.
Una applicazione che scriva i dati nella coda (attaccandovi un
QueueOutputStream
) è quindi isolata dalla
rete, sia per quanto riguarda errori che possibili ritardi.
Il codice del thread copiatore è uguale a prima, solo che ora si avrà che:
mi
è il QueueInputStream
;
mo
è il MessageOutput
.
Vedremo più avanti una classe (Demultiplexer
)
che fra le sue funzioni ha anche quella di thread copiatore per
isolare una applicazione dalla rete.