Wątki - A i B współdzielące listę

0

Hej, mam do napisania program, gdzie w wątku A sczytuje z pliku towary, tworzy obiekty i drukuję informację co 200 obiektów. Drugi wątek B zaś ma sumować co 100 wagę tych towarów i wyświetlać na konsoli. Utworzyłem klasę CreateAndPrint, w niej licznik oraz listę towarów.

import java.io.File;
import java.util.LinkedList;
import java.util.List;
import java.util.Scanner;

public class CreateAndPrint implements Runnable {
    private int counter;
    private List<Towar> towary = new LinkedList<>();
    String path = "C:\\Users\\filip\\Towary.txt";

    @Override
    public void run() {
        try (Scanner sc = new Scanner(new File(path))) {
            synchronized (this) {
                while (sc.hasNext()) {
                    int id_towaru = sc.nextInt();
                    int waga = sc.nextInt();
                    towary.add(new Towar(id_towaru, waga));
                    counter++;
                    if (counter % 200 == 0)
                        System.out.println("utworzono " + counter + " obiektów");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public int getCounter() {
        return counter;
    }

    public List<Towar> getTowary() {
        return towary;
    }
} 

W klasie Main tworzę wątek, uruchamiam, wszystko gra. Proszę o nakierowanie jak współdzielić listę towarów z drugą klasą?

1
3l3ctric_philip napisał(a):

Proszę o nakierowanie jak współdzielić listę towarów z drugą klasą?

np przez parametr Przekazując jako parametr do jednego wątku i do drugiego wątku przed wystartowaniem wątków. Tylko że LinkedLista nie jest najleszym kandydatem na wymianę danych. Lepiej użyć ConcurrentLinkedQueue

2

Jako że to Java to możemy sobie pokomplikować życie i ładnie oddzielić odpowiedzialności między aktorów gdzie Printer i Calculator mają swoje własne bufory:

    public static void main(String[] args) throws Exception {
        var printer = new Printer();
        var calculator = new Calculator();
        var reader = new Reader()
            .withConsumer(printer)
            .withConsumer(calculator);

        var executor = Executors.newFixedThreadPool(3);
        try {
            executor.execute(printer);
            executor.execute(calculator);
            executor.execute(reader);
            Thread.sleep(10000);
        } finally {
            executor.shutdownNow();
        }
    }

    static class Item {
        final int value = new Random().nextInt(1000);
    }

    static class Reader implements Runnable {

        private final Collection<Consumer<Item>> consumers = new CopyOnWriteArrayList<>();

        @Override
        public void run() {
            Stream.generate(Item::new)
                .peek(item -> {
                    try {
                        Thread.sleep(new Random().nextInt(10));
                    } catch (InterruptedException ignore) {}
                })
                .limit(1000)
                .forEach(item -> consumers.forEach(consumer -> consumer.accept(item)));
        }

        Reader withConsumer(Consumer<Item> consumer) {
            consumers.add(consumer);
            return this;
        }
    }

    static class Printer implements Runnable, Consumer<Item> {

        private final BlockingDeque<Item> buffer = new LinkedBlockingDeque<>();

        @Override
        public void run() {
            int counter = 0;
            try {
                while (true) {
                    var item = buffer.pollFirst(10, TimeUnit.SECONDS);
                    if (Objects.nonNull(item) && (++counter % 100 == 0)) {
                        System.out.println("100 items created");
                    }
                }
            } catch (InterruptedException interruptedException) {
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public void accept(Item item) {
            buffer.addLast(item);
        }
    }

    static class Calculator implements Runnable, Consumer<Item> {

        private final BlockingDeque<Item> buffer = new LinkedBlockingDeque<>();

        @Override
        public void run() {
            int counter = 0;
            long sum = 0;
            try {
                while (true) {
                    var item = buffer.pollFirst(10, TimeUnit.SECONDS);
                    if (Objects.nonNull(item)) {
                        sum += item.value;
                        if (++counter % 100 == 0) {
                            System.out.println("Sum of last 100 items is: " + sum);
                            sum = 0;
                        }
                    }
                }
            } catch (InterruptedException interruptedException) {
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public void accept(Item item) {
            buffer.addLast(item);
        }
    }
0

@KamilAdam:

public class SumAndPrint implements Runnable {
    private Queue<Towar> towary;

    private int sum;

    public SumAndPrint(Queue<Towar> towary) {
        this.towary = towary;
    }

    @Override
    public void run() {
        synchronized (towary) {
            for (Towar t : towary) {
                sum += t.getWaga();
                if (t.getId_towaru() % 100 == 0) {
                    System.out.println("policzono wagę " + t.getId_towaru() + " towarów");
                }
            }
        }
        System.out.println("waga wszystkich towrów " + sum);
    }
}

Tak jak zasugerowałeś, stworzyłem pola i konstruktory w tych dwóch klasach.
W klasie Main tworzę kolejkę i wrzucam przy tworzeniu wątków.

public class Main {
    public static void main(String[] args) {

        Queue<Towar> towary = new ConcurrentLinkedQueue<Towar>();

        Thread threadA = new Thread(new CreateAndPrint(towary));
        Thread threadB = new Thread(new SumAndPrint(towary));

        threadA.start();
        try {
            threadA.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        threadB.start();
       try {
            threadB.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Teraz głowię się nad synchronizacją. Bo jak rozumiem wydruk ma być naprzemienny.
utworzono 200 obiektów
policzono wage 100 towarów
utworzono 400 obiektów
itd.
i na końcu suma 10_000 towarów.

U mnie najpierw drukuje się info z wątku A, później z B i na końcu suma, także wytyczna w zadaniu "Zapewnić synchronizację i koordynację pracy obu wątków" leży.

1 użytkowników online, w tym zalogowanych: 0, gości: 1