|
|||||||
|
|
|
![]() |
|
|
Strumenti |
|
|
#1 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
[JAVA] Observer-Observable (più Generics)
Ragazzi ho due domandine per voi
1) C'è un metodo della classe Method che si chiama invoke. Invoke ritorna un Object. Loro per farlo generico l'hanno fatto che ritorna Object, ma io chiamo uno specifico metodo da me scritto, che ritorna una List<String>. Poi passo questa List<String> come input ad un metodo scritto da me. Quindi: List<String> lista = (List<String>)m.invoke(...); miaClasse.doWork(lista); se faccio così, ottendo un warning sulla prima delle due istruzioni. Se faccio: List lista = (List)m.invoke(...); miaClasse.doWork(lista); non ottengo un warning sulla prima riga, ma lo prendo sulla seconda xke doWork ha nella signature una List<String> e non una List e basta. Naturalmente è solo un warning, però vorrei capire cosa mi sfugge, o se è effettivamente impossibile eliminarlo. 2) Domanda prettamente concettuale. Mettiamo che ho un file di testo, e un server che accetta richieste via socket. Quando un client richiede il file di stesto, il server apre un thread che apre uno stream sul file e manda via socket al client che l'ha richiesto. Siccome lo stream, prima di leggere la prima riga, lo posiziono alla fine del file di testo, alla fine tutti i client vedono arrivare le stesse info contemporaneamente (quindi non succede che un client sta alla 10 riga e uno alla 100sima, è un po come se piu terminali stessero in tail -f su un file di log movimentato da un'applicazione). Di conseguenza sarebbe buono se riuscissi a creare 1 solo thread e quindi 1 solo stream e a mandare il risultato della readline a tutti i client attualmente connessi. Qualora si connetterà un nuovo client, inizierò ad inviare anche a lui i risultati della readline, dal punto dove ero arrivato. Come posso implementarlo? Il pattern observer/observable mi può venire in aiuto? Grazie 1000 come sempre, Rob |
|
|
|
|
|
#2 |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
Se ho ben inteso vorresti avere un unico thread lettore e tanti thread che si occupano della comunicazione. In tal caso observer può venire sicuramente in aiuto, in modo da distribuire il multicast(dal lettore verso gli scrittori registrati) il testo letto.
|
|
|
|
|
|
#3 | |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
Quote:
PS: se quello che ho scritto sopra è corretto, c'è il pericolo che, ad esempio, ho 2 thread ascoltatori, leggo un nuovo dato, lo comunico ad entrambi i thread, il primo lo prende ma non lo manda subito, il secondo ancora non lo prende, nel frattempo l'observable sovrascrive il dato letto prima con uno nuovo, il primo thread observer finisce di elaborare il primo dato, al secondo observer arrivano tutti e due i dati letti, oppure solo il secondo? Ultima modifica di tylerdurden83 : 05-05-2010 alle 14:15. |
|
|
|
|
|
|
#4 |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
no il dato è inviato,concettualmente, a tutti i client contemporaneamente e non prevede riscontro, ma c'è l' overhead della chiamata al metodo update. Eventualmente puoi farne una tua versione customizzata.
Ultima modifica di nuovoUtente86 : 05-05-2010 alle 14:20. |
|
|
|
|
|
#5 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
Dal link che ho postato mi sembrava di capire che lui registra due observer, uno Slow con una sleep di 10 secondi dentro (registrato per primo), e uno Normal, senza sleep, e che Normal consumava il dato solo dopo 10 secondi (cioè quando il primo terminava) e non immediatamente come avrebbe potuto fare. Sbaglio?
|
|
|
|
|
|
#6 |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
come ti ho scritto su concettualmente puoi considerare la notifica immediata, ma nelle pratica devi considerare il tempo consumato dal metodo update.
Devi essere tu a decidere, e quindi ottimizzare tale metodo. Alla fin fine a te serve ricevere il dato, ci sarà poi il thread che gestisce il socket di riferimento a consegnarlo sulla rete. |
|
|
|
|
|
#7 | |
|
Senior Member
Iscritto dal: Oct 2007
Città: Padova
Messaggi: 4131
|
Quote:
Il fatto è che il metodo update() di un Observer viene invocato dal metodo notifyObservers() di Observable; se dentro quest'ultimo metodo i vari Observer.update() vengono chiamati dallo stesso thread, è chiaro che si verifica, tra la chiamata dell' update() di un Observer e l'altro, un lag temporale pari al tempo neccessario che il thread deve impiegare nell'eseguire il codice della specifica implementazione di update(). Chiaro che quindi non è una "furbata" chiamare una Thread.sleep() dentro l'update() di un Observer Al limite, sapendo di avere questa neccessità, uno (ad esempio) quando estende Observable fa l'ovverride del metodo notifyObservers() in modo da usare un thread per ogni chiamata ad Observer.update(), cioè un thread per ogni Observer da notificare. Per curiosità posto lo spezzone di codice che illustra il metodo Observable.notifyObservers() nel JDK: Codice:
...
public void notifyObservers(Object arg) {
/*
* a temporary array buffer, used as a snapshot of the state of
* current Observers.
*/
Object[] arrLocal;
synchronized (this) {
/* We don't want the Observer doing callbacks into
* arbitrary code while holding its own Monitor.
* The code where we extract each Observable from
* the Vector and store the state of the Observer
* needs synchronization, but notifying observers
* does not (should not). The worst result of any
* potential race-condition here is that:
* 1) a newly-added Observer will miss a
* notification in progress
* 2) a recently unregistered Observer will be
* wrongly notified when it doesn't care
*/
if (!changed)
return;
arrLocal = obs.toArray();
clearChanged();
}
for (int i = arrLocal.length-1; i>=0; i--)
((Observer)arrLocal[i]).update(this, arg);
}
...
@EDIT: (Nota 2: in particolare, pare che basarsi sull'odine di inserimento degli Observer, come suggeriva qualcuno nel thread che hai linkato, sembrerebbe una pessima idea: la documentazione di Observable dice chiaramente che l'ordine con cui vengono invocati gli Observers è "unspecified". E infatti questi vengono conservati in un Vector, dove vengono inseriti con addElement(), il che significa che ogni nuovo Observer inserito viene inserito in coda al Vector, ma poi, come puoi vedere nel codice qui sopra, il vector stesso viene, molto evangelicamente, ciclato al contrario, della serie: gli ultimi saranno i primi).
__________________
As long as you are basically literate in programming, you should be able to express any logical relationship you understand. If you don’t understand a logical relationship, you can use the attempt to program it as a means to learn about it. (Chris Crawford) Ultima modifica di banryu79 : 05-05-2010 alle 15:13. |
|
|
|
|
|
|
#8 |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
Poichè l' override di notifyObservers comporta la definizione di una nuova struttura di storage per gli observer (obs è private), si può pensare (modificando di conseguenza gli add e delete) anche a strutture eventualmente più performanti e/o a ordinare in base a delle discriminanti in fase si inserimento.
Ma di norma tutto può essere risolto lato observer, senza modifiche al subject. |
|
|
|
|
|
#9 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
Ottimo grazie a tutti e due, mi metto a studiare un po', poi magari posto qualcosa se ancora non mi sarà tutto chiaro.
Per curiosità, che mi dite della domanda sui generics? So che è solo un warning, ma sono curioso di sapere se non sono riuscito io a toglierlo, oppure è effettivamente irrisolvibile... |
|
|
|
|
|
#10 | |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
Quote:
|
|
|
|
|
|
|
#11 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
Ho fatto una prova al volo:
Codice:
public class Main {
public static void main(String[] args) {
ReaderThread readerThread = new ReaderThread();
ConsumerThread output = new ConsumerThread();
ConsumerThread output2 = new ConsumerThread();
ConsumerThread output3 = new ConsumerThread();
readerThread.addObserver(output);
readerThread.addObserver(output2);
readerThread.addObserver(output3);
new Thread(readerThread).start();
}
}
public class ReaderThread extends Observable implements Runnable{
@Override public void run(){
try {
FileInputStream fis = new FileInputStream("C:/Users/Roberto/Documents/NetBeansProjects/Streamer/log.txt");
InputStreamReader isr = new InputStreamReader(fis);
BufferedReader br = new BufferedReader(isr);
String str = "";
while(true){
if((str = br.readLine()) != null ){
System.out.println("ReaderThread: "+str);
setChanged();
notifyObservers(str);
}
Thread.sleep(1000L);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ConsumerThread implements Observer {
public void update(Observable ob, Object o) {
if (o instanceof String) {
final String text = (String) o;
new Thread(new Runnable(){
public void run() {
try {
if(Thread.currentThread().getName().equalsIgnoreCase("Thread-1")){
Thread.sleep(4000L);
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("ConsumerThread received "+text);
}
}).start();
}
}
}
ReaderThread: a ConsumerThread received a ConsumerThread received a ReaderThread: b ConsumerThread received b ConsumerThread received b ConsumerThread received b ReaderThread: c ConsumerThread received c ConsumerThread received c ConsumerThread received c ReaderThread: d ConsumerThread received d ConsumerThread received d ConsumerThread received d ConsumerThread received a ReaderThread: e ConsumerThread received e ConsumerThread received e ConsumerThread received e ReaderThread: f ConsumerThread received f ConsumerThread received f ConsumerThread received f ReaderThread: g ConsumerThread received g ConsumerThread received g ConsumerThread received g La sequenzialità non è garantita, ad esempio il thread-1 ha stampato (nel caso delle socket, inviato) prima b che a, ma mi va bene anche così. Grazie a tutti! (ps la cosa dei generics non l'ho capita benissimo, li parla di runtime, io ho un warning in compilazione che non capisco se impossibile da levare oppure no...) |
|
|
|
|
|
#12 | ||
|
Senior Member
Iscritto dal: Oct 2007
Città: Padova
Messaggi: 4131
|
Quote:
Se togli quel controllo sul Thread-1 per la sleep, l'output verrà stampato nell'ordine che ti aspetti! Ho paura di essermi perso qualcosa, vero? @EDIT: per capirci meglio: se tu modifichi CosumerThread in modo da fargli comunque stampare un messaggio, anche nel caso dia il Thread-1, prima di metterlo in sleep, vedrai che la "sequenzialità", è rispettata e garantita: Modifica: Codice:
class ConsumerThread implements Observer
{
@Override public void update(Observable ob, Object o) {
if (o instanceof String) {
System.out.println(Thread.currentThread().getName());
final String text = (String) o;
final Thread updateExecutor = new Thread(new Runnable() {
@Override public void run() {
try {
if (Thread.currentThread().getName().equalsIgnoreCase("Thread-1")) {
System.out.println("sleep required on Consumer Thread received "+text);
Thread.sleep(4000L);
}
} catch (InterruptedException ex) { ex.printStackTrace(); }
System.out.println("ConsumerThread received "+text);
}
});
updateExecutor.start();
}
}
}
Quote:
__________________
As long as you are basically literate in programming, you should be able to express any logical relationship you understand. If you don’t understand a logical relationship, you can use the attempt to program it as a means to learn about it. (Chris Crawford) Ultima modifica di banryu79 : 05-05-2010 alle 22:38. |
||
|
|
|
|
|
#13 | |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
Quote:
|
|
|
|
|
|
|
#14 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
La sleep era puramente a scopo di test, non ci sarà nessuna cosa del genere nella versione finale. Quello che mi interessava controllare con quel semplice esempio era quanto segue, partendo dal fatto che l'invio di a da sulla socket (in questo caso una print) da parte di uno dei tre observer ha avuto un delay:
ReaderThread: a Thread-2 ConsumerThread received a Thread-3 ConsumerThread received a ReaderThread: b Thread-4 ConsumerThread received b Thread-5 ConsumerThread received b Thread-6 ConsumerThread received b ReaderThread: c Thread-7 ConsumerThread received c Thread-8 ConsumerThread received c Thread-9 ConsumerThread received c ReaderThread: d Thread-10 ConsumerThread received d Thread-11 ConsumerThread received d Thread-12 ConsumerThread received d ReaderThread: e Thread-1 ConsumerThread received a Thread-14 ConsumerThread received e Thread-15 ConsumerThread received e Thread-13 ConsumerThread received e 1- L'eventuale delay sull'invio di a da parte di uno degli observer non comporta rallentamenti in lettura/invio di b etc a nessun observer(tantomeno l'observer stesso che ha avuto problemi ad inviare a) 2- L'observer che deve ancora inviare a non si perde nessun update prima dell'invio di a Nuovo esempio(sempre puramente di test): Codice:
public class ConsumerThread implements Observer {
private int id;
ConsumerThread(int id){
this.id=id;
}
public void update(Observable ob, Object o) {
if (o instanceof String) {
final String text = (String) o;
//new Thread(new Runnable(){
// public void run() {
try {
int sonno=0;
while(sonno<5){
if(this.id==1){
Thread.sleep(1000L);
System.out.println(id+" ha dormito 1 sec...");
sonno++;
} else {
break;
}
}
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println(this.id+" ConsumerThread received "+text);
//}
//}).start();
}
}
}
ReaderThread: a 3 ConsumerThread received a 2 ConsumerThread received a 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ConsumerThread received a ReaderThread: b 3 ConsumerThread received b 2 ConsumerThread received b 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ha dormito 1 sec... 1 ConsumerThread received b ReaderThread: c 3 ConsumerThread received c 2 ConsumerThread received c 1 ha dormito 1 sec... In questo caso, immaginiamo che l'invio sulla socket non è fatto su un thread separato ma direttamente dentro l'update dell'observer. La sequenzialità è garantita, ma se uno degli observer ci mette un pochino ad inviare il dato(e potrebbe benissimo essere così nel mio caso), si ferma tutto, il thread Reader non va avanti e tantomeno gli altri observer). Quindi mi si sono delineati due scenari: Caso 1 -> invio di ogni variabile letta dentro un nuovo thread + Se un dato ci mette tempo ad essere inviato il reader va avanti e tutti gli observer continuano a ricevere i nuovi dati - I dati non arrivano necessariamente in ordine sequenziale Caso 2 -> invio direttamente da update senza aprire un nuovo thread per farlo + I dati arrivano in ordine - Se update di un observer si inchioda per vari secondi, si ferma tutto Forse c'è qualcosa che mi avete detto e m'è scappato, spero di no però... Ultima modifica di tylerdurden83 : 06-05-2010 alle 09:59. |
|
|
|
|
|
#15 | |
|
Senior Member
Iscritto dal: Oct 2007
Città: Padova
Messaggi: 4131
|
Quote:
- una struttura concorrente opportuna (vedi package java.util.concurrent) dove accumulare ogni singola lettura del thread-lettore (penso a una coda di priorità) - un thread che la svuota, sparando le print sulla socket Vado a naso però, non ho alcuna esperienza in materia. Penso dipenda anche da quello che stai cercando di implementare (stavo immaginando un server che produce continuamente dei dati di log, da una parte, e dall'altra dei client che si connettono e da quel momento ricevono le letture del server in modo sequenziale, finchè non si disconnettono. In tale caso ci sono altri dettagli a cui pensare: ad esempio l'uso di una thread pool per i client [a meno che non hai la garanzia che siano sempre quattro gatti] e le sue politiche [java.util.concurrent provvede anche a questo]).
__________________
As long as you are basically literate in programming, you should be able to express any logical relationship you understand. If you don’t understand a logical relationship, you can use the attempt to program it as a means to learn about it. (Chris Crawford) Ultima modifica di banryu79 : 06-05-2010 alle 11:18. |
|
|
|
|
|
|
#16 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
Abuserò un pochino della tua conoscenza...
Ho usato static ExecutorService service = Executors.newCachedThreadPool(); condiviso tra tutti i thread consumatori. Quindi invece di Codice:
new Thread(new Runnable(){
public void run() {
try {
if(Thread.currentThread().getName().equalsIgnoreCase("Thread-1")){
Thread.sleep(4000L);
}
} catch (InterruptedException ex) {
Logger.getLogger(ConsumerThread.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("ConsumerThread received "+text);
}
}).start();
Codice:
SenderThread senderThread = new SenderThread(text); service.execute(senderThread); Codice:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
|
|
|
|
|
|
#17 | |
|
Senior Member
Iscritto dal: Oct 2007
Città: Padova
Messaggi: 4131
|
Ahahah... allora caschi male
E pensare che ti avevo anche avvertito Quote:
Poi fa uso di varibili volatile, ove oppourtono, e dei meccanismi di locking implementati a livello di API nel package stesso (il corpo del metodo 'ensureQueuedTaskHandled' è controllato da un java.util.concurrent.ReentrantLock). Comunque qua chi ti può dare consigli pratici con cognizione di causa, secondo me, è PGI. Io ancora non ho intrapreso uno studio serio e approfondito di tutti i meccanismi di sincronizzazione della tecnologia Java (primitive del linguaggio + API per la concorrenza), per non parlare dello studio completo del Java Memory Model. Se vuoi farti un'idea di massima, tanto per gradire, leggi questo tutorial.
__________________
As long as you are basically literate in programming, you should be able to express any logical relationship you understand. If you don’t understand a logical relationship, you can use the attempt to program it as a means to learn about it. (Chris Crawford) |
|
|
|
|
|
|
#18 |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
ExecutorService è tread safe perchè fa uso di BlockingQueue.
Quanto al tuo progetto la situazione è abbastanza lineare. - Un thread lettoro Observable - N (quanti sono dinamicamente i client connessi) thread, observer, che wrappano i socket e si occupano della comunicazione. - Cosa devono avere di peculiari tali Thread: si devono sincronizzare, con il lettore, su una condizione (ovvero la disponibilità di un nuovo dato letto). Questo lo puoi fare sfruttando le condition messe a disposizione dal framework oppure te la implementi da te. - Il metodo Update, infine, cosa deve fare? Deve semplicemente ricevere il dato e metterlo a disposizione del thread scrittore: operazione, di fatto, a costo costante. |
|
|
|
|
|
#19 |
|
Senior Member
Iscritto dal: Nov 2004
Messaggi: 691
|
E' vero che la collezione BlockingQueue è thread safe, tuttavia se un metodo fa delle operazioni tipo:
if (poolSize >= corePoolSize .... e queste non sono sincronizzate, avrei detto che non è thread safe. Ad es se un metodo fa: if i = 0 then print "OK" potrei avere la print di OK anche se nel frattempo i non vale più 0, perchè le istruzioni non sono nè atomiche nè sincronizzate. Probabilmente però è come dite voi, e basta che tutte le variabili presenti nelle istruzioni condizionali siano volatile (unito ad oggetti controllati da lock) per renderlo thread safe, anche se avrei detto che nel caso di if (a==0 || b==0), anche se a e b sono volatile, quando passo a==0 e vado a valutare b (che valuto correttamente perchè è volatile) a potrebbe essere cambiata... Ultima modifica di tylerdurden83 : 06-05-2010 alle 15:22. |
|
|
|
|
|
#20 |
|
Senior Member
Iscritto dal: Mar 2007
Messaggi: 7863
|
Se vai a spulciare i sorgenti dell' intera classe, ti farai una panoramica più ampia sui meccanismi di sincronizzazione utilizzati.
|
|
|
|
|
| Strumenti | |
|
|
Tutti gli orari sono GMT +1. Ora sono le: 10:35.




















