Witam
Chciałem przetestować interfejs BlockingQueue. W tym celu napisałem sobie taki prosty programik. Niestety zadania consumerów się nie kończą mimo że producer je przerywa.
Moje pytania są takie:
- Dlaczego zaproponowane przeze mnie rozwiązanie nie działa i jak je poprawić?
- Czytałem, że funkcja interrupt()** nie może** przerwać wątku, który oczekuje na operację wejścia wyjścia oraz oczekuje na zwolnienie blokady monitora obiektu, do którego chce się dobrać (ten wątek).Skąd mam wiedzieć czy dany wątek podjął próbę przejęcia monitora i że wywołanie interrupt() dla niego nie zadziała?
- Próba umieszczenia danych w zapchanej kolejce blokującej lub pobrania danych z pustej kolejki skutkuje zawieszeniem wątku. Czyli oddaje on blokadę monitora do obiektu kolejki? No bo inny wątek chyba musi wstawić wartość by ten zablokowany mógł ją pobrać, tak? Co znaczy, że wątek jest zablokowany?
Myślę, że odpowiedzi na te pytania pozwoliłby mi rozwiązać ten problem:
Dziękuję za wszelką pomoc.
class Consumer extends Thread {
private LinkedBlockingQueue<Integer> queue;
private ArrayList<Integer> consumedValues = new ArrayList<Integer>();
public boolean endFlag = false;
public Consumer(LinkedBlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override public synchronized void run() {
try {
while(!Thread.interrupted()) {
System.out.println("Przed konsumpcją... " + Thread.currentThread().getName());
int value = queue.take();
System.out.println("...po konsumpcji! " + Thread.currentThread().getName());
consumedValues.add(value);
System.out.println(Thread.currentThread().getName() + " konsumuje: " + value);
}
}
catch (InterruptedException ex) {
System.out.println("Przerwano Consumer: " + this);
}
finally {
endFlag = true;
System.out.println("Koniec zadania consumer: " + Thread.currentThread().getName());
System.out.println("Skonsumowane wartosci: " + consumedValues);
}
}
}
class Producer implements Runnable {
LinkedBlockingQueue<Integer> queue;
Consumer[] consumers;
public Producer(LinkedBlockingQueue<Integer> queue, Consumer... consumers) {
this.queue = queue;
this.consumers = consumers;
}
@Override public void run() {
try {
for(int i=0; i<10; ++i) {
queue.put(i);
Thread.sleep(100);
}
while(!consumers[0].endFlag || !consumers[1].endFlag) {
//tutaj blokuje sie program, nie ma informacji zwrotnej od consumerow w postaic ustawienia flagi endFlag
for(Consumer c : consumers) {
c.interrupt();
}
}
} catch (InterruptedException ex) {
System.out.println("Przerwano prace producenta!");
}
System.out.println("Koniec zadania producer!");
}
}
public class ArrayQueueMachine {
public static void main(String[] args) throws Exception {
LinkedBlockingQueue queue = new LinkedBlockingQueue(10);
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Producer producer = new Producer(queue,consumer1,consumer2);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(producer);
exec.execute(consumer1);
exec.execute(consumer2);
exec.shutdown();
}
}