Domanda Produttore e consumatore con buffer illimitato

C3n21

Utente Bronze
19 Gennaio 2017
16
2
2
27
Ultima modifica:
Buongiorno a tutti, sto riscontrando un problema con il seguente codice che sarebbe il Produttore e Consumatore con un buffer illimitato rappresentato dall'ArrayList gestito con i semafori e che dovrebbe essere Thread-Safe.

Ora, non so perché ma quando invoco leggi() in th2 mi fa partire l'eccezione, il che mi fa pensare che non abbia l'esclusività della risorsa nonostante abbia fatto l'acquire.
Il Produttore può scrivere all'infinito, ovviamente aggiungendo un dato alla volta, mentre il Consumatore ovviamente non può leggere se l'ArrayList è vuoto.
Qualcuno sa come risolvere questo grattacapo?

Java:
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.Random;
import java.util.concurrent.Semaphore;

public class Risorsa {
    private ArrayList<Integer> list = new ArrayList<>();
    private Semaphore s0 = new Semaphore(1); // per scrivere nella list

    Random rand = new Random(100);

    public static void main(String [] args){
        Risorsa risorsa = new Risorsa();

        Thread th1 = new Thread(){
            /***********************************************
             Scrive un intero (risorsa condivisa) al
             ritmo di uno ogni secondo
             **********************************************/

            @Override
            public void run() {

                System.out.println("Th1 : avvio...");
                while(true){
                    System.out.println("Th1 : sta scrivendo...");
                    risorsa.scrivi();
                    System.out.println("Th1 : ha terminato la scrittura...");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        Thread th2 = new Thread(){
            /***********************************************
             Visualizza tutti i valori prodotti
             (ognuno visualizzato una sola volta)
             **********************************************/

            @Override
            public void run() {
                System.out.println("Th2 : avvio...");

                while(true){
                    System.out.println("Th 2: sta per leggere i valori...");
                    risorsa.leggi();
                    System.out.println("Th 2: ha terminato la lettura");
                }
            }
        };

        th1.start();
        th2.start();
    }
/*******************
Metodi
*******************/
    void scrivi(){
        try {
            s0.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        list.add(rand.nextInt(100));

        System.out.println("Th1 : aggiunto ");
        s0.release();
    }

    void leggi(){
        //Cerca di acquisire il permesso di lettura
        try {
            s0.acquire();
        } catch (InterruptedException e) {
            System.out.println("?");
        }

        if (list.isEmpty()){//pre-rilascio
            s0.release();
            System.out.println("Nulla da leggere!");
        }
        else {
            try{
                list.forEach(it->{
                    System.out.println("\tHo letto : "+it);
                    list.remove(it);
                });
            }
            catch (ConcurrentModificationException e){//Se non catturo l'eccezione va in errore
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        s0.release();
    }
}

Grazie in anticipo per le risposte e buona giornata ( o buona serata se lo state leggendo di sera)
 
La remove dentro il forEach non è safe, prova con questo:
Java:
try{                                                              
    list.forEach(it->{ System.out.println("\tHo letto : "+it); });
    list.clear();                                                 
}
 
Buongiorno a tutti, sto riscontrando un problema con il seguente codice che sarebbe il Produttore e Consumatore con un buffer illimitato rappresentato dall'ArrayList gestito con i semafori e che dovrebbe essere Thread-Safe.
Innanzitutto è bene dire alcune cose in generale.
Quando un metodo è "bloccante" e può causare un InterruptedException, la cosa migliore è far uscire InterruptedException dal metodo, in modo che sia il chiamante a decidere meglio cosa fare. Questo semplifica anche le cose nella implementazione del metodo bloccante.

Il tuo scrivi() quindi:

Java:
void scrivi() throws InterruptedException {
    s0.acquire();
    try {
        // codice .....
    } finally {
        s0.release();
    }
}

Come altra cosa: ConcurrentModificationException non andrebbe MAI catturata.


Ora, non so perché ma quando invoco leggi() in th2 mi fa partire l'eccezione, il che mi fa pensare che non abbia l'esclusività della risorsa nonostante abbia fatto l'acquire.

Il Produttore può scrivere all'infinito, ovviamente aggiungendo un dato alla volta, mentre il Consumatore ovviamente non può leggere se l'ArrayList è vuoto.
Il leggi() è chiaramente solo un po' più particolare. Infatti c'è una precondizione: "la lista non deve essere vuota". Se vuota, allora bisogna stare in attesa che diventi non-vuota.

Ma il test per la lista vuota deve essere fatto tenendo il permit del semaforo, e se vuota allora bisogna rilasciare il permit, altrimenti un altro thread non potrebbe completare un scrivi(). Ma una volta che hai rilasciato il permit, bisogna ripartire dalla acquisizione del permit.
Insomma ... serve un ciclo! (che non vedo nel tuo leggi() )
 
L'iteratore restituito sulla struttura dati ha un fail-fast behavior, ovvero, se la struttura dati da cui l'hai tirato fuori e' modificata dopo la sua creazione lancia ConcurrentModificationException per prevenire un undefined behavior futuro a run-time. Per risolvere dovresti prima consumare gli item ed una volta consumati effettuare la clean() della lista. Ti consiglio anche di utilizzare meccanismi di wait/signal per evitare l'attesa attiva e di incapsulare l'accesso alla struttura dati piu' che effettuare P() e V() a destra ed a sinistra.
 
Grazie a tutti per le risposte.
Il leggi() è chiaramente solo un po' più particolare. Infatti c'è una precondizione: "la lista non deve essere vuota". Se vuota, allora bisogna stare in attesa che diventi non-vuota.

Ma il test per la lista vuota deve essere fatto tenendo il permit del semaforo, e se vuota allora bisogna rilasciare il permit, altrimenti un altro thread non potrebbe completare un scrivi(). Ma una volta che hai rilasciato il permit, bisogna ripartire dalla acquisizione del permit.
Insomma ... serve un ciclo! (che non vedo nel tuo leggi() )

Invece del ciclo ho pensato di fare direttamente una return in questo modo:

Java:
void leggi() throws InterruptedException {
        //Cerca di acquisire il permesso di lettura
        s0.acquire();


        if (list.isEmpty()){//pre-rilascio
            s0.release();
            System.out.println("Nulla da leggere!");
            return; //<------------------
        }
        else {

            list.forEach(it->{
                System.out.println("\tHo letto : "+it);
            });
            list.clear();

            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        s0.release();
}


La remove dentro il forEach non è safe, prova con questo:
Java:
try{                                                             
    list.forEach(it->{ System.out.println("\tHo letto : "+it); });
    list.clear();                                                
}

Ora funziona :D

L'iteratore restituito sulla struttura dati ha un fail-fast behavior, ovvero, se la struttura dati da cui l'hai tirato fuori e' modificata dopo la sua creazione lancia ConcurrentModificationException per prevenire un undefined behavior futuro a run-time. Per risolvere dovresti prima consumare gli item ed una volta consumati effettuare la clean() della lista. Ti consiglio anche di utilizzare meccanismi di wait/signal per evitare l'attesa attiva e di incapsulare l'accesso alla struttura dati piu' che effettuare P() e V() a destra ed a sinistra.

Ma io sto usando i semafori, non è attesa attiva giusto?
Per P() e V() intendi acquire() e release() ?
 
Si, per P() e V() intendo acquire() e release(). Nel momento in cui invochi il metodo leggi() e la struttura dati e' vuota non ti sospendi, ti stai limitando a non fare nulla, ma il non fare nulla sono cicli macchina sprecati. Se per esempio il produttore dopo un certo tempo t non producesse piu' item noteresti che il consumatore continua a ciclare in attesa di Item da consumare(ed e' attesa attiva), quando potrebbe benissimo sospendersi ed essere risvegliato soltanto quando c'e' effettivamente almeno un Item nella struttura dati. Inoltre nel caso in cui la struttura dati e' vuota effettui due V() cio' vuol dire che aggiungi +2 permit sul semaforo e rompi l'inviariante che volevi ottenere con il semaforo(la mutua esclusione).
 
Si, per P() e V() intendo acquire() e release(). Nel momento in cui invochi il metodo leggi() e la struttura dati e' vuota non ti sospendi, ti stai limitando a non fare nulla, ma il non fare nulla sono cicli macchina sprecati. Se per esempio il produttore dopo un certo tempo t non producesse piu' item noteresti che il consumatore continua a ciclare in attesa di Item da consumare(ed e' attesa attiva), quando potrebbe benissimo sospendersi ed essere risvegliato soltanto quando c'e' effettivamente almeno un Item nella struttura dati. Inoltre nel caso in cui la struttura dati e' vuota effettui due V() cio' vuol dire che aggiungi +2 permit sul semaforo e rompi l'inviariante che volevi ottenere con il semaforo(la mutua esclusione).

Capito, quindi quando faccio l'acquire() in realtà il Thread va in busy waiting, non va in wait.
Quindi dovrei fare wait() per mettere in pausa un Thread?
 
No, l'attesa attiva non e' causata dall'acquire(). Il metodo leggi viene invocato all'interno di un while (true) per cui, sotto ipotesi di struttura dati condivisa definitivamente vuota, l'attesa attiva e' causata dal ciclo seguente di statement: s0.acquire() -> s0.release() -> s0.acquire() -> s0.release() -> ...
Per risolvere dovresti utilizzare i metodi ereditati da Object(wait e notify) o utilizzare delle condition variable, in particolare il metodo leggi nel momento in cui la lista e' vuota effettua una wait(), mentre il metodo scrivi nel momento in cui aggiunge un nuovo item nella struttura condivisa effettua una notify().
 
  • Mi piace
Reazioni: C3n21
Capito, quindi quando faccio l'acquire() in realtà il Thread va in busy waiting, non va in wait.
Quindi dovrei fare wait() per mettere in pausa un Thread?
Nel tuo leggi() postato prima, c'è come ho detto in precedenza una "precondizione": la lista non deve essere vuota
Se è vuota, nel tuo codice hai fatto il release e poi un return ma c'è una questione: dovrebbe quindi essere il chiamante a ripetere il leggi() ma il metodo ha tipo di ritorno void, pertanto il chiamante non ha alcun appiglio per capire se quel leggi() ha "letto" qualcosa o se invece no. Quindi fatto così ha poco senso. Avrebbe invece più senso se il leggi() fosse "bloccante", ovvero sta lì in attesa che la lista diventi non-vuota.

Problema: se si usa un classico "semaforo" si deve per forza usare un meccanismo di attesa "attiva". Il test per empty deve essere fatto tenendo il permit del semaforo (così come durante il resto della lettura se non-vuota). Ma se è vuota, devi rilasciare il permit affinché un scrivi() possa inserire qualcosa. E quindi dovresti poi riacquisire il permit di nuovo, ecc..
Quindi come ho già detto, servirebbe un ciclo nel leggi(). Ciclo però che consuma CPU perché se nessuno sta scrivendo (quindi nessuno cerca di fregare il permit), il leggi è continuamente in loop tra acquire e release, che sono praticamente immediati. Insomma, il thread NON va mai in sospensione (non per questi acquire/release di per sé ... il sistema però potrebbe togliergli cpu per altri motivi).

Morale: per fare un leggi "bloccante" come si deve, un semaforo non è proprio la cosa migliore. Si deve usare quella che si chiama una "condition queue".
E si può quindi usare la condition queue intrinseca degli oggetti (i metodi wait/notify/notifyAll di un oggetto) oppure quella dei lock "espliciti" presenti nel package java.util.concurrent.locks .

Il wait() DEVE essere invocato su un oggetto di cui il thread ha acquisito il "monitor" (il lock, per dirlo semplicemente). Quindi deve stare in un contesto synchronized. Il wait() nota bene, fa andare il thread in sospensione ma RILASCIA il lock! Questo è il punto essenziale.
E il wait va sempre fatto in un piccolo ciclo che deve testare la condizione che deve "reggere" affinché il thread resti in sospensione. Ci sono motivazioni che ora non sto a precisare.

Se si pensa ad una classica "coda" ma bloccante (in cui il put è illimitato mentre il get è bloccante se vuota), la condition queue è appropriata in questo caso. Il get() dovrebbe essere qualcosa del genere (abbozzato):

Codice:
public class CodaBloccante<E> {
    // .....altro.....

    public synchronized E get() throws InterruptedException {
        while (vuota) {
            wait();
        }

        // estrai elemento
        // return elemento
    }
}

È abbozzato/incompleto ma serve a dare l'idea. Quel "vuota" l'ho scritto come variabile ma potrebbe essere la invocazione di un metodo apposito. Non ha importanza. Il concetto è: "finché vuota STAI in wait".
Un put() (idem synchronized) inserirà un elemento nella struttura dati della coda (non mostrata sopra) e poi farà un notify() o meglio un notifyAll() in modo che uno/più thread in attesa su quella condition queue (di quell'oggetto) si "svegli" e ritenti il get.

Piu chiaro?
 
  • Mi piace
Reazioni: C3n21