PDA

View Full Version : [Java] Thred e similuazione di acquisizione/computazione dati da sensore


guylmaster
14-05-2014, 11:34
Salve a tutti,
ho il seguente problema da cui non riesco a venirne fuori, provo a descriverlo mediante un toy example:

Mettiamo il caso che io ho N file csv, ogni file simula i dati acquisiti da un sensore. Ad esempio se il sensore fosse una stazione meteo potremmo dire che il file csv contiene al suo interno le seguenti colonne:

timestamp - temperatura - umidità

Ora la cadenza con cui i sensori inviano le informazioni sono randomiche per ogni sensore, questo significa che i file csv possono essere di diversa lunghezza e che i timestamp dei vari file csv non sono allineati.

Io con questi file dovrei simulare un acquisizione online delle informazioni dal sensore, ovvero far finta che ci sia uno streaming di dati aperto con ogni sensore, ed il sensore quando vuole lui mi manda dei dati.

Fin qui sarebbe facile, ovvero potrei creare N thread, uno per sensore, ognuno che legge una riga alla volta il csv di un sensore. Il problema però è più complicato, io con i dati arrivati dai sensori devo fare delle operaizoni quindi ogni qualvolta mi arriva un dato da uno qualsiasi dei sensori io devo lanciare una funzione che mi esegue dei conteggi.

Quindi l'esempio potrebbe essere del tipo:
Sono in ascolto su N sensori, ed ho una struttura dati con dei conteggi che è comune per tutti i sensori. Appena mi arriva un osservazione da uno qualsiasi dei sensori devo lanciare una funzione che mi aggiorna i conteggi (che ripeto sono in comune su tutti i sensori) e che se si verifica una certa condizione mettiamo che chiama un ulteriore funzione.

Come potrei fare tutto questo?

Vi ringrazio in anticipo per l'aiuto,
guylmaster.

Daniels118
14-05-2014, 11:44
La parte più complessa nel tuo problema è la sincornizzazione. Java mette a disposizione un meccanismo molto semplice per far si che due o più thread accedano uno per volta ad una risorsa, basta utilizzare un blocco synchronized.
Se l'elaborazione è veramente breve puoi richiamarla dai thread stessi, altrimenti conviene avere un thread dedicato da tenere in pausa ed attivare quando arriva un dato. A questo punto dovresti decidere cosa fare se arriva un dato mentre è già in corso un'elaborazione per decidere come implementare il tutto.

guylmaster
14-05-2014, 12:01
La parte più complessa nel tuo problema è la sincornizzazione. Java mette a disposizione un meccanismo molto semplice per far si che due o più thread accedano uno per volta ad una risorsa, basta utilizzare un blocco synchronized.
Se l'elaborazione è veramente breve puoi richiamarla dai thread stessi, altrimenti conviene avere un thread dedicato da tenere in pausa ed attivare quando arriva un dato. A questo punto dovresti decidere cosa fare se arriva un dato mentre è già in corso un'elaborazione per decidere come implementare il tutto.


Il problema è un pò più complesso, non devo solo sincronizzare l'accesso alla struttura dati dei conteggi. Ti faccio un esempio pratico:

Se ho due sensori, ognuno con il proprio file csv:

Csv1:
18:00 - low - low
19:00 - low - high

Csv2
18:03 - high - low

Ho bisogno di leggere prima l'osservazione alle 18:00 di csv1 e aggiornare i conti, poi quella alle 18:03 e aggiornare i conti e poi di nuovo alle 19:00 del file csv1 ed aggiornare i conti. Se aggiornado i conti si verifica una determianta condizione devo lanciare una determinata routine.

Se facessi un thread per ogni sensore che legge il file CSV non saprei come dirgli di leggere in ordine temporale sopra tutti i sensori.

In pratica ogni thread legge le osservazioni una per volta dal proprio file CSV in maniera sequenziale, ma in realtà il thread che ha a disposizione l'osservazione con il time stamp più recente ha diritto di eseguire i conteggi e non solo, gli altri devono rimanere in pausa in attesa che finisca per poi poter lanciare loro l'aggiornamento dei conteggi sempre in ordine di osservazione più recente. Questo perchè i conteggi devono essere ricalcolati per ogni osservazione.

Daniels118
14-05-2014, 12:45
Ho capito, comunque questo è un problema che può verificarsi solo in fase di test, perché i veri sensori inviano (spero) i dati appena sono disponibili, quindi questi arriveranno naturalmente ordinati per timestamp.
Certo potrebbe verificarsi che due sensori inviino i propri dati a distanza di pochi microsecondi e che i thread se ne accorgano in ordine inverso, in questo caso - se per te conta anche il microsecondo - ha senso preoccuparsi.

Quindi, se non ti interessa un precisione da orologio atomico ma vuoi solo risolvere il problema in fase di test, basta che le righe dei CSV vengano lette da un thread separato che le mette in ordine e poi informa di volta in volta il thread opportuno.

Se invece vuoi gestire anche il caso di cui ti ho parlato prima, devi inserire i dati ricevuti in una coda a priorità e definire una "vecchiaia" minima prima di elaborare i dati. Non esiste infatti alcuna sfera di cristallo che consenta di sapere se in futuro arriverà un dato che riguarda un avvenimento precedente, bisogna per forza stabilire un margine di tolleranza e sperare che non venga superato.

guylmaster
14-05-2014, 13:58
Ho capito, comunque questo è un problema che può verificarsi solo in fase di test, perché i veri sensori inviano (spero) i dati appena sono disponibili, quindi questi arriveranno naturalmente ordinati per timestamp.
Certo potrebbe verificarsi che due sensori inviino i propri dati a distanza di pochi microsecondi e che i thread se ne accorgano in ordine inverso, in questo caso - se per te conta anche il microsecondo - ha senso preoccuparsi.

Quindi, se non ti interessa un precisione da orologio atomico ma vuoi solo risolvere il problema in fase di test, basta che le righe dei CSV vengano lette da un thread separato che le mette in ordine e poi informa di volta in volta il thread opportuno.

Se invece vuoi gestire anche il caso di cui ti ho parlato prima, devi inserire i dati ricevuti in una coda a priorità e definire una "vecchiaia" minima prima di elaborare i dati. Non esiste infatti alcuna sfera di cristallo che consenta di sapere se in futuro arriverà un dato che riguarda un avvenimento precedente, bisogna per forza stabilire un margine di tolleranza e sperare che non venga superato.

Si sono in fase di test, ma visto che il mio scopo è creare un modello matematico (che poi chi vorrà potrà utilizzarsi lui per quello che vuole) in realtà non ci sarà mai una fase successiva di funzionamento vero e proprio (almeno da parte mia). Quindi poi dipenderà dal dominio di applicazione che uno dovrà andarsi a studiare bene i ritardi eccetera.

Ora dato che parliamo di calcoli abbastanza pesanti mi è stato chiesto di sviluppare il tutto su più thread, perchè l'idea è che in futuro questa cosa potrebbe evolvere come "un server per ogni sensore" o qualcosa di simile.

Ritornando ai multi-thread però mi è sorto in dubbio. Anche se gestisco l'ordinamento delle osservazioni con una coda con priorità (spero esista già una sua implementazione in java) in realtà visto che i conteggi vanno fatti un osservazione per volta, e visto che per fare i conteggi io devo bloccare l'accesso alla struttura dati, non ritorniamo di nuovo al caso del sequenziale?

Cioè Thread1 riceve un osservazione, inizia i calcoli e blocca la risorsa conteggi, thread2 nel mentre riceve un osservazione ma deve aspettare che i conteggi vengano sbloccati. A questo punto ritorna tutto ad essere sequenziale, no?
Potrei semplicemente leggere una riga per volta da ogni CSV e mettere le osservazioni in una coda con priorità. Poi elaboro tutte le osservazioni nella coda con priorità e solo DOPO averle elaborate tutte passo a leggere un ulteriore riga dai csv.

ingframin
14-05-2014, 14:08
Ti conviene raccogliere i dati in arrivo in una coda, quando hai una coda completa fai la tua elaborazione (svuoti la coda) e riparti da capo.
Un altra alternativa e' calcolare ogni volta che ti arriva un dato e correggere il conteggio quando arrivano gli altri.
Non puoi predire il futuro, quindi a un certo punto dovrai fare il tuo calcolo senza aspettare troppo a lungo.

EDIT: non avevo letto l'ultimo post, e' esattamente quello che intendevo io... e' la vecchiaia che mi frega :asd:

guylmaster
14-05-2014, 14:21
Ti conviene raccogliere i dati in arrivo in una coda, quando hai una coda completa fai la tua elaborazione (svuoti la coda) e riparti da capo.
Un altra alternativa e' calcolare ogni volta che ti arriva un dato e correggere il conteggio quando arrivano gli altri.
Non puoi predire il futuro, quindi a un certo punto dovrai fare il tuo calcolo senza aspettare troppo a lungo.

EDIT: non avevo letto l'ultimo post, e' esattamente quello che intendevo io... e' la vecchiaia che mi frega :asd:


Ma quindi, considerado che sto leggendo da N file csv, mi conviene leggere una riga per ogni csv, le butto dentro alla coda con priorità, e poi le estraggo da li ordinate per tempo.

La coda con priorità sarà quindi una coda di priorità di Mia_classe_personalizzata, che conterrà all'interno le variabili "timestamp" ed altre. E la priorità sarà data a quella con il timestamp più basso (il timestmap è un double).

Daniels118
14-05-2014, 14:39
Dipende, se vuoi solo dimostrare l'elaborazione non c'è bisogno di thread e code, basta ordinare i dati e farli elaborare in sequenza. Se invece ti interessa implementare un'applicazione che sia già predisposta al collegamento con i sensori, allora dovrai prendere qualche accorgimento in più.

guylmaster
14-05-2014, 15:10
Dipende, se vuoi solo dimostrare l'elaborazione non c'è bisogno di thread e code, basta ordinare i dati e farli elaborare in sequenza. Se invece ti interessa implementare un'applicazione che sia già predisposta al collegamento con i sensori, allora dovrai prendere qualche accorgimento in più.

Mettiamo il caso che voglio farlo il più realistico possibile ed il più "parallelizzato" possibile (quindi se servono i thread mettiamo i thread).

Teniamo conto che:

Al posto dei sensori abbiamo un csv per ogni sensore, e all'interno di un file csv le osservazioni del singolo sensore sono ordinate dalla più vecchia alla più recente;

Ho una struttura dati Conteggi, che è comune per tutti i sensori;

Ho una funzione "effettuaConteggi", che deve essere richiamata per ogni osservazione che arriva e che ha bisogno per effettuare i conteggi sia dell'osservazione attuale che dell'osservazione precedente del medesimo sensore (perchè vogliamo contare i cambi di stato e il tempo di permanenza nei vari stati). Oltre che ovviamente della struttura dati Conteggi;

Tu come lo svilupperesti? ti verrebbe da buttarmi giù un minimo di pseudocodice per capirci meglio?

Daniels118
14-05-2014, 16:47
La risposta sta venendo un po' lunga, invio la prima parte.

Nell'ottica di realizzare un'applicazione quanto più possibile vicina all'uso pratico, dovresti prima di tutto definire bene come vengono letti i dati dai sensori. Mi rendo conto che questo potrebbe essere un aspetto del tutto sconosciuto al momento, quindi ti faccio questa proposta:
definisci una classe astratta Valore che contiene il campo timestamp e derivane una classe per ogni tipologia di sensore contenente altri N campi associati ai valori restituiti dal sensore, ad esempio:
abstract class Valore {
public long timestamp;
}

class ValoreStazioneMeteo extends Valore {
public double temperatura;
public double umidita;
}
definisci una classe astratta "Sensore" che presenta un metodo "Valore read()". Tale metodo è sincrono, ovvero, una volta invocato si blocca fino a quando il dato diventa disponibile. Quando il dato è disponibile il metodo restituisce il valore letto e il timestamp corrispondente.
abstract class Sensore {
public abstract Valore read();
}
A questo punto potrai implementare la classe SensoreCsv che estende la classe Sensore (presenta quindi il metodo read).
L'implementazione di questa classe potrebbe essere questa:
abstract class SensoreCsv extends Sensore {
private SynchronousQueue<Valore> coda = new SynchronousQueue<Valore>();
protected BufferedReader br = null;

@Override
public Valore read() {
return coda.take();
}

public void setFile(String filename) {
br = new BufferedReader(new FileReader(filename));
}

abstract Valore readCsv();

void write(Valore val) {
coda.put(val);
}
}
E finalmente:
class SensoreStazioneMeteoCsv extends SensoreCsv {
@Override
ValoreStazioneMeteo readCsv() {
String line = br.readLine();
if (line == null) return null;
ValoreStazioneMeteo val = new ValoreStazioneMeteo();
//facciamo il parsing della riga e popoliamo i campi di val
return val;
}
}
Un thread dedicato si occuperà della lettura del csv:
class CsvReader implements Runnable {
private HashMap<SensoreCsv,Valore> valoriSensori = new HashMap<Sensore,Valore>();

CsvReader() {
//Popoliamo l'attributo "valoriSensori"
SensoreCsv sensore = new SensoreStazioneMeteoCsv();
sensore.setFile("file.csv");
Valore val = sensore.readCsv();
valoriSensori.put(sensore, valore);
}

public void run() {
Set<Map.Entry<SensoreCsv,Valore>> setVS = valoriSensori.entrySet();
boolean done;
do {
//EDIT se non mettiamo questa riga il ciclo non termina mai:
done = true;
long minTimestamp = Long.MAX_VALUE;
//Cerchiamo il sensore con il timestamp minore
SensoreCsv prossimoSensore = null;
for (Map.Entry<SensoreCsv,Valore> valoreSensore : setVS) {
Valore val = valoreSensore.getValue();
if (val != null && val.timestamp < minTimestamp) {
minTimestamp = val.timestamp;
prossimoSensore = valoreSensore.getKey();
}
}
//Se c'era almeno un dato disponibile
if (prossimoSensore != null) {
Valore val = valoriSensori.get(prossimoSensore); //Otteniamo il Valore
prossimoSensore.write(val); //Avvisiamo il sensore che deve pubblicare il dato
Valore newVal = prossimoSensore.readCsv();
valoriSensori.put(prossimoSensore, newVal);
done = false;
}
} while (!done);
}
}

A questo punto, salvo mie immancabili imprecisioni, dovresti avere un'interfaccia che simula egregiamente dei sensori ad interrogazione sincrona, non resta che sviluppare l'applicazione che deve interrogarli (continuerò domani).

guylmaster
15-05-2014, 00:27
La risposta sta venendo un po' lunga, invio la prima parte.

Nell'ottica di realizzare un'applicazione quanto più possibile vicina all'uso pratico, dovresti prima di tutto definire bene come vengono letti i dati dai sensori. Mi rendo conto che questo potrebbe essere un aspetto del tutto sconosciuto al momento, quindi ti faccio questa proposta:
definisci una classe astratta Valore che contiene il campo timestamp e derivane una classe per ogni tipologia di sensore contenente altri N campi associati ai valori restituiti dal sensore, ad esempio:
abstract class Valore {
public long timestamp;
}

class ValoreStazioneMeteo extends Valore {
public double temperatura;
public double umidita;
}
definisci una classe astratta "Sensore" che presenta un metodo "Valore read()". Tale metodo è sincrono, ovvero, una volta invocato si blocca fino a quando il dato diventa disponibile. Quando il dato è disponibile il metodo restituisce il valore letto e il timestamp corrispondente.
abstract class Sensore {
public abstract Valore read();
}
A questo punto potrai implementare la classe SensoreCsv che estende la classe Sensore (presenta quindi il metodo read).
L'implementazione di questa classe potrebbe essere questa:
abstract class SensoreCsv extends Sensore {
private SynchronousQueue<Valore> coda = new SynchronousQueue<Valore>();
protected BufferedReader br = null;

@Override
public Valore read() {
return coda.take();
}

public void setFile(String filename) {
br = new BufferedReader(new FileReader(filename));
}

abstract Valore readCsv();

void write(Valore val) {
coda.put(val);
}
}
E finalmente:
class SensoreStazioneMeteoCsv extends SensoreCsv {
@Override
ValoreStazioneMeteo readCsv() {
String line = br.readLine();
if (line == null) return null;
ValoreStazioneMeteo val = new ValoreStazioneMeteo();
//facciamo il parsing della riga e popoliamo i campi di val
return val;
}
}
Un thread dedicato si occuperà della lettura del csv:
class CsvReader implements Runnable {
private HashMap<SensoreCsv,Valore> valoriSensori = new HashMap<Sensore,Valore>();

CsvReader() {
//Popoliamo l'attributo "valoriSensori"
SensoreCsv sensore = new SensoreStazioneMeteoCsv();
sensore.setFile("file.csv");
Valore val = sensore.readCsv();
valoriSensori.put(sensore, valore);
}

public void run() {
Set<Map.Entry<SensoreCsv,Valore>> setVS = valoriSensori.entrySet();
boolean done;
do {
//EDIT se non mettiamo questa riga il ciclo non termina mai:
done = true;
long minTimestamp = Long.MAX_VALUE;
//Cerchiamo il sensore con il timestamp minore
SensoreCsv prossimoSensore = null;
for (Map.Entry<SensoreCsv,Valore> valoreSensore : setVS) {
Valore val = valoreSensore.getValue();
if (val != null && val.timestamp < minTimestamp) {
minTimestamp = val.timestamp;
prossimoSensore = valoreSensore.getKey();
}
}
//Se c'era almeno un dato disponibile
if (prossimoSensore != null) {
Valore val = valoriSensori.get(prossimoSensore); //Otteniamo il Valore
prossimoSensore.write(val); //Avvisiamo il sensore che deve pubblicare il dato
Valore newVal = prossimoSensore.readCsv();
valoriSensori.put(prossimoSensore, newVal);
done = false;
}
} while (!done);
}
}

A questo punto, salvo mie immancabili imprecisioni, dovresti avere un'interfaccia che simula egregiamente dei sensori ad interrogazione sincrona, non resta che sviluppare l'applicazione che deve interrogarli (continuerò domani).

Mi sono accorto di un problema, che effettiamente era sfuggito nei miei test anche a me:

Se io leggo un osservazione per sensore e prendo la più recente è un problema, vediamolo con un esempio:

CSV1:
8:00 - temperature - umidità
8:10 - temparatura - umidità

CSV2:
8:20 - temperature - umidità
9:00....

Cosi facendo farò 8:00 dal primo sensore e 8:20 dal secondo sensore. Poi però nel primo sensore ho 8:10 come seconda lettura. Quindi come si fa?
Metti caso che un sensore fa tantissime osservazioni ravvicinate e l'altro no come faccio a risolvere questo problema? dovrei leggere tutte le osservazioni, di tutti i csv, e prendere da li la più vecchia? Ma qui parliamo di avere anche 400.000 e più osservazioni rischiamo che manco ci entrano tutte e finiamo la memoria nei casi peggiori.

Comunque grazie mille a tutti per l'aiuto che mi state dando :)

Daniels118
15-05-2014, 08:39
Rivedi il metodo run che ho realizzato nel mio precedente post, è un'implementazione di un algoritmo di merge che serve a risolvere il problema al quale accennavi, se vuoi ulteriori dettagli sul funzionamento vedi qui: Merge algorithm (http://en.wikipedia.org/wiki/Merge_algorithm).

banryu79
15-05-2014, 10:23
Forse può fare al caso tuo una coda di priorità boccante: java.util.concurrent.PriorityBlockingQueue (le librerie Java hanno già un sacco di pappa pronta all'uso, testata e ben funzionante, non serve ogni volta reinventare la ruota).

I thread-sensori tentano di schiaffare dati nella coda.
C'è un thread lettore (ma possono essere anche più di uno) che tenta di estrarre i dati dalla coda.

La coda è bloccante: finchè è piena (stabilisci tu quanto grande farla) i thread-sensori restano in attesa che si liberino dei posti, e finchè è vuota il thread lettore resta in attesa del primo dato disponibile.

La coda è una coda di priorità: ogni elemento infilato nella coda viene riordinato nella stessa in base ad una politica di priorità (stabilisci tu il criterio, mediante un Comparator, per intenderci).

---

Questo dopo aver letto al volo la discussione e al netto della mia memoria; sono almeno sue anni che non programmo praticamente più...
Comunque prima di progettare l'implementazione di una soluzione usando un certo strumento, sarebbe il caso di andare a studiarsi quello strumento (prendila come un consiglio, non come una critica, non vuole esserlo! E' che è proprio questa la causa dei tuoi dubbi, secondo me).

wingman87
15-05-2014, 10:40
Questo dopo aver letto al volo la discussione e al netto della mia memoria; sono almeno sue anni che non programmo praticamente più...


Posso chiederti di cosa ti occupi ora? Ti ho sempre letto con piacere e mi dispiacerebbe non leggerti più...

guylmaster
15-05-2014, 12:27
Rivedi il metodo run che ho realizzato nel mio precedente post, è un'implementazione di un algoritmo di merge che serve a risolvere il problema al quale accennavi, se vuoi ulteriori dettagli sul funzionamento vedi qui: Merge algorithm (http://en.wikipedia.org/wiki/Merge_algorithm).

Ammetto che volevo riadattare il tutto all'utilizzo delle code di priorità ma ho fatto casino.

Io prima, per andare in errore, prendevo un valore per ogni sensore e poi me li computavo in ordine timestamp crescente. Solo dopo aver computato tutti i valori rileggevo dai sensori. Questo faceva si che avevo valori non allineati.

Se invece prendo una lettura per sensore, prendo il valore del sensore più piccolo e di quel sensore di cui ho preso il valore più piccolo leggo subito il successivo valore, dovrei evitare il problema.

guylmaster
15-05-2014, 12:29
Forse può fare al caso tuo una coda di priorità boccante: java.util.concurrent.PriorityBlockingQueue (le librerie Java hanno già un sacco di pappa pronta all'uso, testata e ben funzionante, non serve ogni volta reinventare la ruota).

I thread-sensori tentano di schiaffare dati nella coda.
C'è un thread lettore (ma possono essere anche più di uno) che tenta di estrarre i dati dalla coda.

La coda è bloccante: finchè è piena (stabilisci tu quanto grande farla) i thread-sensori restano in attesa che si liberino dei posti, e finchè è vuota il thread lettore resta in attesa del primo dato disponibile.

La coda è una coda di priorità: ogni elemento infilato nella coda viene riordinato nella stessa in base ad una politica di priorità (stabilisci tu il criterio, mediante un Comparator, per intenderci).

---

Questo dopo aver letto al volo la discussione e al netto della mia memoria; sono almeno sue anni che non programmo praticamente più...
Comunque prima di progettare l'implementazione di una soluzione usando un certo strumento, sarebbe il caso di andare a studiarsi quello strumento (prendila come un consiglio, non come una critica, non vuole esserlo! E' che è proprio questa la causa dei tuoi dubbi, secondo me).


Io stavo provando questo esempio di SynchronousQueue:


package threadTest;


import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueTest
{
private SynchronousQueue sq = new SynchronousQueue(true);

class PutThread implements Runnable
{
public void run()
{
for(int i=0; i <1000; i++)
{
try {
System.out.println("PUT");
//sq.put("A");
sq.put("A");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

class TakeThread implements Runnable
{
public void run()
{
for(int i=0; i <1000; i++)
{
try {
System.out.println("TAKE");
System.out.println(sq.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args)
{
new Thread((new SynchronousQueueTest()).new PutThread()).start();
new Thread((new SynchronousQueueTest()).new TakeThread()).start();
}
}


Il problema è che mio aspettato 1000 PUT e 1000 TAKE alternati, invece stampa un PUT ed un TAKE e rimane in pausa all'infinito. Dove sbaglio? :fagiano:

Daniels118
15-05-2014, 12:41
L'errore sta nel fatto che i due thread utilizzano due istanze diverse di SynchronousQueueTest, pertanto fanno riferimento a due code diverse.

Daniels118
15-05-2014, 13:20
Continuiamo da dove ci eravamo fermati ieri.

Definiamo un comparatore per la coda a priorità:
class CompTimestampValore implements Comparator<Valore> {
int compare(Valore o1, Valore o2) {
if (o1.timestamp < o2.timestamp) return -1;
if (o1.timestamp > o2.timestamp) return 1;
return 0;
}
}
Definiamo quindi una classe capace di leggere i dati dei sensori e comunicarli ad un elaboratore:
class SensorReader implements Runnable {
protected Sensore sensore;
protected Elaboratore elaboratore;

public setSensore(Sensore sensore) {
this.sensore = sensore;
}

public setElaboratore(Elaboratore elaboratore) {
this.elaboratore = elaboratore;
}

public void run() {
while (true) {
Valore val = sensore.read();
elaboratore.send(val);
}
}
}
Ancora una classe per gestire il delay:
class ValoreHolder {
private Valore valore;
private long expires;

ValoreHolder(Valore valore, long delay) {
this.valore = valore;
Date d = new Date();
this.expires = d.getTime() + delay;
}

boolean isOld() {
Date d = new Date();
return d.getTime() >= expires;
}

Valore getValore() {
return this.valore;
}
}
Infine definiamo una classe per elaborare i dati:
class Elaboratore implements Runnable {
protected PriorityQueue<ValoreHolder> coda;

public Elaboratore() {
CompTimestampValore comparatore = new CompTimestampValore();
this.coda = new PriorityQueue<ValoreHolder>(100, comparatore);
}

public void bindSensore(sensore) {
SensorReader reader = new SensorReader();
reader.setSensore(sensore);
reader.setElaboratore(this);
Thread t = new Thread();
t.start(reader);
}

public void run() {
while (true) {
ValoreHolder holder;
//Attendiamo che la coda contenga un valore abbastanza vecchio
while (true) {
synchronized (coda) {
holder = coda.peek(); //Leggiamo il valore senza rimuoverlo
}
if (holder == null) {
Thread.sleep(100); //La coda è vuota, aspettiamo un po'
} else if (holder.isOld()) { //Se il valore è abbastanza vecchio
synchronized (coda) {
holder = coda.poll(); //Rimuoviamolo dalla coda
}
break;
}
}
Valore val = holder.getValore();
//Elaboriamo il valore

}
}

public void send(Valore val) {
ValoreHolder holder = new ValoreHolder(val, 500);
synchronized (coda) {
coda.add(holder);
}
}
}
Salvo i soliti errori dovrebbe fare quello che ti serve.

guylmaster
15-05-2014, 13:58
Continuiamo da dove ci eravamo fermati ieri.

Definiamo un comparatore per la coda a priorità:
class CompTimestampValore implements Comparator<Valore> {
int compare(Valore o1, Valore o2) {
if (o1.timestamp < o2.timestamp) return -1;
if (o1.timestamp > o2.timestamp) return 1;
return 0;
}
}
Definiamo quindi una classe capace di leggere i dati dei sensori e comunicarli ad un elaboratore:
class SensorReader implements Runnable {
protected Sensore sensore;
protected Elaboratore elaboratore;

public setSensore(Sensore sensore) {
this.sensore = sensore;
}

public setElaboratore(Elaboratore elaboratore) {
this.elaboratore = elaboratore;
}

public void run() {
while (true) {
Valore val = sensore.read();
elaboratore.send(val);
}
}
}
Ancora una classe per gestire il delay:
class ValoreHolder {
private Valore valore;
private long expires;

ValoreHolder(Valore valore, long delay) {
this.valore = valore;
Date d = new Date();
this.expires = d.getTime() + delay;
}

boolean isOld() {
Date d = new Date();
return d.getTime() >= expires;
}

Valore getValore() {
return this.valore;
}
}
Infine definiamo una classe per elaborare i dati:
class Elaboratore implements Runnable {
protected PriorityQueue<ValoreHolder> coda;

public Elaboratore() {
CompTimestampValore comparatore = new CompTimestampValore();
this.coda = new PriorityQueue<ValoreHolder>(100, comparatore);
}

public void bindSensore(sensore) {
SensorReader reader = new SensorReader();
reader.setSensore(sensore);
reader.setElaboratore(this);
Thread t = new Thread();
t.start(reader);
}

public void run() {
while (true) {
ValoreHolder holder;
//Attendiamo che la coda contenga un valore abbastanza vecchio
while (true) {
synchronized (coda) {
holder = coda.peek(); //Leggiamo il valore senza rimuoverlo
}
if (holder == null) {
Thread.sleep(100); //La coda è vuota, aspettiamo un po'
} else if (holder.isOld()) { //Se il valore è abbastanza vecchio
synchronized (coda) {
holder = coda.poll(); //Rimuoviamolo dalla coda
}
break;
}
}
Valore val = holder.getValore();
//Elaboriamo il valore

}
}

public void send(Valore val) {
ValoreHolder holder = new ValoreHolder(val, 500);
synchronized (coda) {
coda.add(holder);
}
}
}
Salvo i soliti errori dovrebbe fare quello che ti serve.

In pratica con Syncronyzed di una variabile cosa succede? il thread successivo si mette automaticamente in coda e parte solo quando la variabile non è più usata? E se si mette in attesa da solo allora a cosa serve lo sleep? o c'è qualcosa che mi sfugge nel funzionamento di Syncronyzed?

Ho provato a fixare l'esempio SynchronousQueue così:


package threadTest;


import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueTest
{
private static SynchronousQueue sq;

class PutThread implements Runnable
{
SynchronousQueue sq;
public void run()
{
for(int i=0; i <1000; i++)
{
try {
System.out.println("PUT");
//sq.put("A");
sq.put("A");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public PutThread(SynchronousQueue sq)
{
this.sq = sq;
}

}

class TakeThread implements Runnable
{
SynchronousQueue sq;

public void run()
{
for(int i=0; i <1000; i++)
{
try {
System.out.println("TAKE");
//System.out.println(sq.take());
sq.take();
} catch (InterruptedException e) {
System.out.println("wait");
e.printStackTrace();
}
}
}
public TakeThread(SynchronousQueue sq)
{
this.sq = sq;
}
}

public static void main(String[] args)
{
SynchronousQueue sq = new SynchronousQueue();
new Thread((new SynchronousQueueTest()).new PutThread(sq)).start();
new Thread((new SynchronousQueueTest()).new TakeThread(sq)).start();
}
}



Però l'output non è del tutto alternato, è di questo genere:


PUT
TAKE
TAKE
PUT
PUT
TAKE
TAKE
PUT


Cosa altro sbaglio?

Daniels118
15-05-2014, 15:21
I blocchi synchronized non agiscono su una variabile, ma su un oggetto: è importante fare questa differenza, perché variabili diverse possono puntare allo stesso oggetto.
Se un thread accede ad un blocco synchronized per un dato oggetto, tutti gli altri thread che vogliono accedere ad un blocco synchronized per quello stesso oggetto si mettono in pausa finché il primo thread non esce dal blocco.
Questo costrutto si usa specialmente quando si vuole impedire che un thread legga un oggetto mentre questo viene modificato da un altro thread.
Esistono anche delle classi "thread-safe" i cui metodi sono sincronizzati internamente, però vista la quantità minima di istruzioni da sincronizzare in questa applicazione secondo me conviene fare da se.

Nota che sleep viene richiamato all'esteno del blocco synchronized. Il blocco synchronized serve per evitare di leggere dalla coda mentre un altro thread vi sta scrivendo; se non prendessimo questa precauzione, la lettura potrebbe restituire dati errati. Se la coda è vuota peek() ritorna null, e solo il questo caso viene eseguito sleep(); ciò serve ad evitare un inutile sovraccarico nel caso in cui la coda sia vuota.

Quanto al codice che hai postato, il problema sta nel fatto che la stampa del messaggio TAKE avviene prima della chiamata al rispettivo metodo. Facciamo un esempio:
1) Le code sono vuote
2) PutThread stampa "PUT"
3) PutThread chiama put e si blocca perché la coda è piena
4) TakeThread stampa "TAKE"
5) TakeThread chiama take, trova un valore in coda e ritorna
6) TakeThread stampa "TAKE"
7) TakeThread chiama take e si blocca perchè la coda è vuota
8) PutThread si sblocca perché la coda è vuota
9) PutThread stampa "PUT"
10) PutThread chiama put e si blocca perché la coda è piena
e così via...

Invertendo la chiamata al metodo get e la stampa del relativo messaggio hai la garanzia che questo venga stampato ad operazione avvenuta. Esempio:
1) Le code sono vuote
2) PutThread stampa "PUT"
3) PutThread chiama put e si blocca perché la coda è piena
4) TakeThread chiama take, trova un valore in coda e ritorna
5) TakeThread stampa "TAKE"
6) TakeThread chiama take e si blocca perchè la coda è vuota
7) PutThread si sblocca perché la coda è vuota
8) PutThread stampa "PUT"
9) PutThread chiama put e si blocca perché la coda è piena
10) TakeThread si sblocca perchè la coda è piena
10) TakeThread stampa "TAKE"
e così via...

In realtà anche in questo modo non è garantito che i messaggi vengano stampati in questo ordine perché, sebbene sia molto improbabile, lo switch di contesto potrebbe avvenire proprio tra il ritorno dal metodo e la stampa del messaggio, così:
1) Le code sono vuote
2) PutThread stampa "PUT"
3) PutThread chiama put e si blocca perché la coda è piena
4) TakeThread chiama take, trova un valore in coda e ritorna
5) PutThread si sblocca perché la coda è vuota
6) PutThread stampa "PUT"
7) TakeThread stampa "TAKE"
8) TakeThread chiama take e si blocca perchè la coda è vuota

La soluzione ideale sarebbe quella di mettere la stampa del messaggio direttamente dentro ai metodi sincroni put e take, ma non potendo mettere le mani all'interno di tali metodi l'unica alternativa resta quella di sincronizzare manualmente il codice; in questo caso si potrebbe anche fare a meno della coda ed utilizzare i metodi wait e notify.

guylmaster
15-05-2014, 16:10
I blocchi synchronized non agiscono su una variabile, ma su un oggetto: è importante fare questa differenza, perché variabili diverse possono puntare allo stesso oggetto.
Se un thread accede ad un blocco synchronized per un dato oggetto, tutti gli altri thread che vogliono accedere ad un blocco synchronized per quello stesso oggetto si mettono in pausa finché il primo thread non esce dal blocco.
Questo costrutto si usa specialmente quando si vuole impedire che un thread legga un oggetto mentre questo viene modificato da un altro thread.
Esistono anche delle classi "thread-safe" i cui metodi sono sincronizzati internamente, però vista la quantità minima di istruzioni da sincronizzare in questa applicazione secondo me conviene fare da se.

Nota che sleep viene richiamato all'esteno del blocco synchronized. Il blocco synchronized serve per evitare di leggere dalla coda mentre un altro thread vi sta scrivendo; se non prendessimo questa precauzione, la lettura potrebbe restituire dati errati. Se la coda è vuota peek() ritorna null, e solo il questo caso viene eseguito sleep(); ciò serve ad evitare un inutile sovraccarico nel caso in cui la coda sia vuota.

Quanto al codice che hai postato, il problema sta nel fatto che la stampa del messaggio TAKE avviene prima della chiamata al rispettivo metodo. Facciamo un esempio:
1) Le code sono vuote
2) PutThread stampa "PUT"
3) PutThread chiama put e si blocca perché la coda è piena
4) TakeThread stampa "TAKE"
5) TakeThread chiama take, trova un valore in coda e ritorna
6) TakeThread stampa "TAKE"
7) TakeThread chiama take e si blocca perchè la coda è vuota
8) PutThread si sblocca perché la coda è vuota
9) PutThread stampa "PUT"
10) PutThread chiama put e si blocca perché la coda è piena
e così via...

Invertendo la chiamata al metodo get e la stampa del relativo messaggio hai la garanzia che questo venga stampato ad operazione avvenuta. Esempio:
1) Le code sono vuote
2) PutThread stampa "PUT"
3) PutThread chiama put e si blocca perché la coda è piena
4) TakeThread chiama take, trova un valore in coda e ritorna
5) TakeThread stampa "TAKE"
6) TakeThread chiama take e si blocca perchè la coda è vuota
7) PutThread si sblocca perché la coda è vuota
8) PutThread stampa "PUT"
9) PutThread chiama put e si blocca perché la coda è piena
10) TakeThread si sblocca perchè la coda è piena
10) TakeThread stampa "TAKE"
e così via...

In realtà anche in questo modo non è garantito che i messaggi vengano stampati in questo ordine perché, sebbene sia molto improbabile, lo switch di contesto potrebbe avvenire proprio tra il ritorno dal metodo e la stampa del messaggio, così:
1) Le code sono vuote
2) PutThread stampa "PUT"
3) PutThread chiama put e si blocca perché la coda è piena
4) TakeThread chiama take, trova un valore in coda e ritorna
5) PutThread si sblocca perché la coda è vuota
6) PutThread stampa "PUT"
7) TakeThread stampa "TAKE"
8) TakeThread chiama take e si blocca perchè la coda è vuota

La soluzione ideale sarebbe quella di mettere la stampa del messaggio direttamente dentro ai metodi sincroni put e take, ma non potendo mettere le mani all'interno di tali metodi l'unica alternativa resta quella di sincronizzare manualmente il codice; in questo caso si potrebbe anche fare a meno della coda ed utilizzare i metodi wait e notify.

Si infatti il problema in cui riconcorrevo e che anche mettendo prima i metodi sulla coda sincronizzata e poi le stampe, comunque mi uscivano in disordine.

Purtroppo però sto provando a tirar giù un esempio con i wait e i notify e ci sto impazzendo.

Daniels118
15-05-2014, 16:29
Devi invertire solo quelli nel TakeThread, l'idea è quella di stampare il messaggio quanto più possibile vicino all'esecuzione dell'operazione. Il put prima inserisce e poi si blocca, il take invece prima si blocca e poi inserisce, per cui bisogna stampare prima di inserire e leggere prima di stampare. Fermo restando che questa non è una soluzione a prova di proiettile.

Se vuoi puoi vedere come implementare una semplice blocking queue qui:
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

guylmaster
15-05-2014, 16:42
Devi invertire solo quelli nel TakeThread, l'idea è quella di stampare il messaggio quanto più possibile vicino all'esecuzione dell'operazione. Il put prima inserisce e poi si blocca, il take invece prima si blocca e poi inserisce, per cui bisogna stampare prima di inserire e leggere prima di stampare. Fermo restando che questa non è una soluzione a prova di proiettile.

Se vuoi puoi vedere come implementare una semplice blocking queue qui:
http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

Ho provato ad usare la blocking queque di quell'esempio ma nulla, come metto e metto le stampe mi escono sempre mischiate, questo è il mio codice:


package threadTest;


import java.util.concurrent.SynchronousQueue;

public class SynchronousQueueTest
{
private static SynchronousQueue sq;

class PutThread implements Runnable
{
BlockingQueue bq;
public void run()
{
for(int i=0; i <1000; i++)
{

try {
bq.enqueue("A");
System.out.println("PUT");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}
public PutThread(BlockingQueue bq)
{
this.bq = bq;
}

}

class TakeThread implements Runnable
{
BlockingQueue bq;

public void run()
{
for(int i=0; i <1000; i++)
{
try {
System.out.println("GET");
bq.dequeue();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public TakeThread(BlockingQueue bq)
{
this.bq = bq;
}
}

public static void main(String[] args)
{
SynchronousQueue sq = new SynchronousQueue();
BlockingQueue bq = new BlockingQueue(1);
new Thread((new SynchronousQueueTest()).new PutThread(bq)).start();
new Thread((new SynchronousQueueTest()).new TakeThread(bq)).start();
}
}


Davvero ho provato a mettere le stampe in tutte le possibili combinaizoni, che mi sia sfuggita prorpio quella giuta? :)

Far funzionare questa cosa sta iniziando a diventare una questione di principio :D

Daniels118
16-05-2014, 09:36
Prova così:
public class CodaSincrona<E> {
private E obj = null;

public synchronized void put(E e) throws InterruptedException {
while (obj != null) {
wait();
}
System.out.println("put");
obj = e;
notifyAll();
}

public synchronized E take() throws InterruptedException {
while (obj == null) {
wait();
}
System.out.println("take");
E t = obj;
obj = null;
notifyAll();
return t;
}
}

marco.r
16-05-2014, 18:33
Opinions Mia: un thread per sensore che legge una riga, calcola di quanto fare la sleep e invia il valore letto in una coda di messaggi comune a tutti i thread. Un thread aggiuntivo legge i messaggi in arrivo aggiorna la tabella dei valori correnti ed effettua il calcolo. Semplice ed elegante. Comunque prima di andare via di codice direi che bisognerebbe dettagliare un po meglio le specifiche: se un sensore mi manda dati pii velocemente di un altro che faccio ? E se il calcolo non sta dietro alla frequenza di arrivo sei dati (può capitare)? Posso permettermi di perdere valori per strada ?

guylmaster
17-05-2014, 23:50
Prova così:
public class CodaSincrona<E> {
private E obj = null;

public synchronized void put(E e) throws InterruptedException {
while (obj != null) {
wait();
}
System.out.println("put");
obj = e;
notifyAll();
}

public synchronized E take() throws InterruptedException {
while (obj == null) {
wait();
}
System.out.println("take");
E t = obj;
obj = null;
notifyAll();
return t;
}
}

Ok perfetto ora va. Quindi in pratica il punto della situazione è che qualsiasi codice voglio sincronizzare mi conviene incastonarlo direttmanete nel metodi put e take. La possibilità invece di dire al thread A se è vuota scrivi, e al thread B se è piena esegui, non sembra a quanto pare fattibile.

Grazie mille anche per l'implementazione dei sensori. Adesso sto vedendo come riadattare tutto al mio caso perchè mi sto accorgendo che nello speigarlo qui ho semplificato troppo e l'esecuzione multithread è più complicata di quel che ipotizzavo :P

guylmaster
18-05-2014, 00:12
Opinions Mia: un thread per sensore che legge una riga, calcola di quanto fare la sleep e invia il valore letto in una coda di messaggi comune a tutti i thread. Un thread aggiuntivo legge i messaggi in arrivo aggiorna la tabella dei valori correnti ed effettua il calcolo. Semplice ed elegante. Comunque prima di andare via di codice direi che bisognerebbe dettagliare un po meglio le specifiche: se un sensore mi manda dati pii velocemente di un altro che faccio ? E se il calcolo non sta dietro alla frequenza di arrivo sei dati (può capitare)? Posso permettermi di perdere valori per strada ?

Diciamo che cosi realistico non mi serve. L'algoritmo di merge delle liste consigliato da Daniels va benissimo per simulare l'arrivo delle osservazioni dai CSV, per adesso lo uso in una prima versione dell'algoritmo a singolo thread e funziona.

marco.r
18-05-2014, 17:03
Diciamo che cosi realistico non mi serve. L'algoritmo di merge delle liste consigliato da Daniels va benissimo per simulare l'arrivo delle osservazioni dai CSV, per adesso lo uso in una prima versione dell'algoritmo a singolo thread e funziona.
Se ti basta l'ordine relativo di arrivo dei messaggi allora il merge e' sufficiente e non ha senso parallelizzarlo, visto che l'estrazione dei dati da CSV una una quantita' di cpu infima. Faresti un sacco di fatica per nulla.