5.4 Multithreading en concurrency

Wat en waarom?

We maken eerst onderscheid tussen de begrippen parallellisme en concurrency.

Bij parallellisme (soms ook multiprocessing genoemd) worden meerdere instructies gelijktijdig uitgevoerd. Dit vereist meerdere verwerkingseenheden (machines, CPU’s, CPU cores, …) waarop die instructies uitgevoerd kunnen worden. Parallellisme wordt interessant wanneer er veel instructies uitgevoerd moeten worden die onafhankelijk van elkaar kunnen gebeuren. Hoe meer afhankelijkheden er zijn, hoe meer er gecoördineerd moet worden, wat leidt tot meer overhead.

drawing

Bij concurrency (soms ook multitasking genoemd) zijn meerdere taken actief op een bepaald moment. Dat betekent dat een tweede taak al kan starten voordat de eerste afgelopen is. Het hoeft echter niet zo te zijn dat er ook gelijktijdig instructies voor elk van die taken uitgevoerd worden. Concurrency impliceert dus geen parallelisme: je kan concurrency hebben met slechts 1 single-core CPU. Er zijn meerdere mogelijkheden om concurrency te bereiken met slechts één CPU:

  • pre-emption: een externe scheduler (bijvoorbeeld in het besturingssysteem) beslist wanneer en voor hoelang een bepaalde taak de processor ter beschikking krijgt. Een voorbeeld hiervan is time slicing: het besturingssysteem onderbreekt elke taak na een bepaalde vaste tijd (of aantal instructies), om de controle vervolgens aan een andere taak te geven.
  • cooperative multitasking: een taak beslist zelf wanneer die de controle teruggeeft, bijvoorbeeld wanneer er gewacht moet worden op een ’trage’ operatie zoals het lezen van een bestand, het ontvangen van een inkomend netwerkpakket, …. Veel programmeertalen (maar niet Java) ondersteunen coöperatieve multitasking via coroutines en async/await keywords.
drawing

Het spreekt voor zich dat, op moderne (multi-core) machines, concurrency en parallelisme vaak gecombineerd worden. Er zijn dus meerdere taken actief, waarvan sommige ook tegelijkertijd uitgevoerd worden.

Multithreading tenslotte is een specifiek concurrency-mechanisme: er worden, binnen eenzelfde proces van het besturingssysteem, meerdere ’threads’ gestart om taken uit te voeren. Deze threads delen hetzelfde geheugen (namelijk dat van het proces), en kunnen daardoor dus efficiënter data uitwisselen dan wanneer elke taak als apart proces gestart zou worden. Op elk moment kunnen er dus meerdere threads bestaan, die (afhankelijk van of er ook parallellisme is in het systeem) al dan niet gelijktijdig instructies uitvoeren. Binnen een Java-programma is multithreading de voornaamste manier om zowel parallelisme als concurrency te bereiken.

IO-bound vs CPU-bound tasks

De taken die al dan niet gelijktijdig uitgevoerd moeten worden, kunnen onderverdeeld worden in zogenaamde CPU-bound en IO-bound taken.

Bij een CPU-bound taak wordt de uitvoeringstijd voornamelijk gedomineerd door de uitvoering van instructies (berekeningen), bijvoorbeeld algoritmes, beeldverwerking, simulaties, …. Hoe sneller de CPU, hoe sneller de taak dus afgewerkt kan worden.

Een IO-bound task daarentegen is vaak aan het wachten op externe gebeurtenissen (interrupts), zoals netwerk- of schijf-toegang, timers, gebruikersinvoer, … Een snellere CPU zal deze taak niet sneller doen gaan.

In het algemeen is parallellisme vooral nuttig wanneer er veel CPU-bound taken zijn. De totale tijd die nodig is om alle taken uit te voeren kan op die manier geminimaliseerd worden. Voor IO-bound tasks is parallelisme niet noodzakelijk een goede oplossing: de verschillende CPU’s zouden gelijktijdig aan het wachten zijn op externe interrupts. Het gebruik van concurrency kan hier wel soelaas bieden: terwijl één taak wacht op een externe gebeurtenis, kan een andere verder uitgevoerd worden.

Threads in Java

De basisklasse in Java om te werken met multithreading is de Thread-klasse. Typisch geef je een Runnable mee die uitgevoerd moet worden. Dat kan een object zijn, een lambda, of een method reference. Een aangemaakte thread start niet automatisch; om de uitvoering van de thread te starten, moet je de start-methode oproepen. Als je wil wachten tot de thread afgelopen is, kan dat via de join-methode. Deze blokkeert de uitvoering tot de uitvoering van de thread afloopt. Je kan ook een timeout meegeven aan join, met de maximale tijdsduur dat je wilt wachten. Als deze tijd voorbij is vooraleer de thread afloopt, wordt een InterruptedException gegooid.

Bijvoorbeeld: onderstaande code start een nieuwe thread, die een regel uitprint. Er zijn in het programma dus twee threads actief: (1) de thread die de nieuwe thread aanmaakt, start, en er vervolgens (maximaal 2 seconden) op wacht; en (2) de thread die de boodschap uitprint.

Een exception in een thread komt niet terug terecht in de originele thread; deze kan dus ‘verloren’ gaan.

var t1 = new Thread(() -> {
    System.out.println("Hello from a new thread");
});
t1.start();
...
t1.join(Duration.ofSeconds(2));

Een standaard Java thread wordt uitgevoerd met een thread van het besturingssysteem (een platform thread, die typisch overeenkomt met een kernel thread). Zo krijg je dus, op machines die het ondersteunen, ook automatisch parallellisme in Java: meerdere threads kunnen effectief tegelijkertijd instructies uitvoeren. Een nadeel hiervan is dat het aantal threads beperkt is: elke platform-thread komt met een zekere overhead.

Sinds Java 21 beschikt Java ook over virtual threads. Dat zijn threads die beheerd worden door de Java Virtual Machine (JVM) zelf, en dus niet 1-1 overeenkomen met een platform thread. Je JVM zal een virtuele thread koppelen aan een platform thread, en deze terug ontkoppelen op het moment dat de virtuele thread moet wachten op een trage operatie. De platform thread is dan terug vrij, en kan gebruikt worden om een andere virtuele thread uit te voeren. Virtual threads zijn het Java-alternatief voor cooperative multitasking. Het teruggeven van de controle wordt echter afgehandeld door de onderliggende bibliotheken, dus als programmeur word je niet geconfronteerd met wachten op blokkerende operaties, of async/await keywords. Je schrijft je code dus gewoon zoals sequentiële code zonder rekening te houden met mogelijk blokkerende operaties. Virtuele threads hebben ook weinig overhead; je kan er zonder probleem duizenden van aanmaken zonder significante impact op de performantie.

Synchronisatie

Wanneer meerdere threads (of processen) met gedeelde data werken, ontstaat er een nood aan synchronisatie. Het kan immers zijn dat beide threads dezelfde data lezen en/of aanpassen, waardoor de data inconsistent wordt. Dat wordt een race-conditie genoemd.

Race-conditie

Neem het voorbeeld hieronder: één thread verhoogt de waarde van een teller 10.000 keer, de andere verlaagt die 10.000 keer. Welke waarde van de teller verwacht je op het einde van de code?

class Counter {
  private int count = 0;
  public void increment() { count++; }
  public void decrement() { count--; }
  public int getCount() { return count; }
}

var counter = new Counter();

var inc = new Thread(() -> {
  for (int i = 0; i < 10_000; i++) counter.increment();
});
var dec = new Thread(() -> {
  for (int i = 0; i < 10_000; i++) counter.decrement();
});

inc.start(); dec.start();
inc.join(); dec.join();

System.out.println(counter.getCount());

Op mijn machine gaven drie uitvoeringen van dit programma achtereenvolgens de waarden -803, -5134, en 3041. Hoe kan dat? Er worden toch evenveel increments uitgevoerd als decrements, waardoor de teller terug op 0 moet uitkomen? De reden is dat het lezen en terug wegschrijven van de (verhoogde of verlaagde) variabele (i.e., count++ en count--) twee afzonderlijke operaties zijn, die door het onderlinge verschil in timing tussen de threads op verschillende momenten kunnen plaatsvinden. Bijvoorbeeld, stel dat count op een bepaald moment gelijk is aan 40; de inc-thread verhoogt de teller 2 keer, en de dec-thread verlaagt de teller 1 keer, in onderstaande volgorde:

Thread incThread dec
lees count=40
lees count=40
zet count=41
lees count=41
zet count=42
zet count=39

De teller is nu niet 41 (wat de correcte waarde zou zijn na 2 verhogingen en 1 verlaging), maar 39. De twee verhogingen hebben dus geen enkel effect gehad. Door andere volgordes (interleavings) van beide threads kan je zo verschillende resultaten krijgen: count kan (in theorie) elke waarde tussen -10.000 en 10.000 hebben op het einde van het programma. We zeggen dat de Counter-klasse niet thread-safe is: ze kan niet correct gebruikt worden door meerdere threads zonder extra maatregelen.

Volatile variabelen

Bovenstaande code heeft ook nog een ander probleem: wanneer de code uitgevoerd wordt op meerdere CPU’s of cores, kan het kan zijn de count variabele enkel in het lokale geheugen van één CPU aangepast wordt (bv. een register, L1 cache, …), en nog niet meteen naar het (gedeelde) geheugen teruggeschreven wordt. Zelfs als de tweede thread de waarde van count pas uitleest nadat deze aangepast is door de eerste thread, kan het zijn dat die nog een oude waarde ziet:

Thread incThread dec
lees count=40
zet count=41
lees count=41
zet count=42
lees count=40
zet count=39
schrijf count naar geheugen
schrijf count naar geheugen

Om dit op te lossen, kan je in Java een variabele als volatile markeren. Dat garandeert dat alle aanpassingen aan die variabele meteen naar het centrale geheugen geschreven worden, en vermijdt bovenstaande situatie.

class Counter {
  private volatile int count = 0;
  public void increment() { count++; }
  public void decrement() { count--; }
  public int getCount() { return count; }
}

Een volatile variabele is dus een variabele waarvan alle threads steeds de laatste waarde zien. Technisch gezien komt volatile eigenlijk met een nog sterkere garantie: als thread B een volatile variabele leest die door thread A geschreven werd, zullen ook alle andere (niet-volatile) variabelen die B daarna leest, de waarde hebben die ze in thread A hadden voordat die thread de volatile variabele aanpaste. Met andere woorden, thread B ziet consistente waarden voor alle variabelen die een invloed gehad zouden kunnen hebben op de huidige waarde van de volatile variabele. Het schrijven en vervolgens lezen van een volatile variabele is dus een synchronisatie-punt tussen threads.

Bijvoorbeeld, stel dat t, u, v, en w vier variabelen zijn, waarvan enkel v als volatile gemarkeerd is. Alle variabelen hebben oorspronkelijk 0 als waarde. In de tabel hieronder is met een * aangegeven welke waarden in thread B gegarandeerd de meest recente waarden zijn. In het bijzonder zal variabele w niet noodzakelijk de laatste waarde hebben, omdat die door thread A na variabele v geschreven werd. Ook variabele u zal, vooraleer v gelezen wordt, niet noodzakelijk de meest recente waarde bevatten.

Thread AThread B
schrijf t=1
schrijf u=1
schrijf v=1 (volatile)
schrijf w=1
lees u=?
lees v=1 (volatile) *
lees t=1 *
lees u=1 *
lees w=?

Merk wel op dat het gebruik van volatile de race-conditie van hierboven nog steeds niet oplost! Een variabele als volatile markeren heeft immers enkel invloed op de zichtbaarheid van die variabele. Het biedt geen garanties bij gelijktijdige aanpassingen door meerdere threads. Daarvoor hebben we synchronisatie nodig.

Synchronizatie-primitieven

Om race condities te voorkomen, moet je toegang tot gedeeld geheugen op een of andere manier synchroniseren. Meer specifiek wil je een sequentie van operaties atomisch kunnen maken: ze moeten worden uitgevoerd alsof ze één primitieve operatie zijn, die niet onderbroken kan worden door een andere thread. In het voorbeeld van hierboven wil je dus dat het lezen, verhogen, en wegschrijven van de teller steeds als één geheel uitgevoerd wordt, in plaats van als twee aparte operaties.

Java biedt meerdere synchronizatie-primitieven aan. Sommigen zitten ingebouwd in de taal (bv. volatile en synchronized; zie later), andere zitten in de packages java.util.concurrent en java.util.concurrent.locks. We bespreken er hier enkele.

Semafoor

Een Semaphore deelt een gegeven maximum aantal toestemmingen uit aan threads (via acquire()). Alle volgende threads die toestemming willen, moeten wachten tot een van de vorige toestemmingen terug ingeleverd wordt (via release()). Een binaire semafoor (met maximaal 1 toestemming) kan dienen als mutual exclusion lock (mutex). Een semafoor houdt (conceptueel) enkel een teller bij van het resterend aantal toestemmingen, en niet welke thread al toestemming heeft. Er is dan ook geen enkele garantie of verificatie dat een thread die release() oproept effectief zo’n toestemming verkregen had; het aantal beschikbare toestemmingen wordt gewoon terug verhoogd.

Een voorbeeld van het gebruik van een semafoor voor de implementatie van Counter:

class Counter {
    private volatile int count = 0;
    private final Semaphore sem = new Semaphore(1);

    public void increment() throws InterruptedException {
        sem.acquire();
        try {
            count++;
        } finally {
            sem.release();
        }
    }

    public void decrement() throws InterruptedException {
        sem.acquire();
        try {
            count--;
        } finally {
            sem.release();
        }
    }

    public int getCount() {
        return count;
    }
}

Met het gebruik van een semafoor voor synchronisatie krijg je ook automatisch zichtbaarheids-garanties; je kan het vrijgeven van de semafoor beschouwen als het schrijven naar een volatile variabele, en het verkrijgen van een toestemming als het lezen van een volatile variabele. Alle wijzigingen die gedaan worden voor het vrijgeven zijn dus zichtbaar voor alle threads die nadien een toestemming verkrijgen.

Lock

De Lock interface, en de implementatie ReentrantLock, stellen een mechanisme voor waarbij een thread de lock kan verkrijgen (lock()) en terug vrijgeven (unlock()). Hierbij wordt wél nagegaan dat enkel de thread die de lock verkregen heeft, de lock terug kan vrijgeven. ‘Re-entrant’ betekent dat een thread die reeds een lock heeft, verder mag gaan wanneer die een tweede keer lock() oproept (op hetzelfde lock-object).

Een voorbeeld van het gebruik van een ReentrantLock voor de implementatie van Counter:

class Counter {
    private volatile int count = 0;
    private final Lock lock = new ReentrantLock();

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }

    public void decrement() {
        lock.lock();
        try {
            count--;
        } finally {
            lock.unlock();
        }
    }

    public int getCount() {
        return count;
    }
}

Net zoals bij een semafoor krijg je bij het gebruik van een lock ook automatisch zichtbaarheids-garanties; je kan ‘unlock()’ beschouwen als het schrijven naar een volatile variabele, en ’lock()’ als het lezen van een volatile variabele. Alle wijzigingen die gedaan worden voor het unlocken zijn dus zichtbaar voor alle threads die nadien lock uitvoeren.

AtomicInteger

Specifiek voor primitieve types biedt Java ook een verzameling atomische objecten aan, bijvoorbeeld AtomicInteger. Daarop zijn operaties gedefinieerd zoals incrementAndGet, updateAndGet, getAndAdd, …. Bovenstaande counter kan dus ook eenvoudig als volgt geïmplementeerd worden:

class Counter {
    private final AtomicInteger count = new AtomicInteger(0);

    public void increment() { count.incrementAndGet(); }
    public void decrement() { count.decrementAndGet(); }
    public int getCount() { return count.get(); }
}

Er zijn ook andere klassen, bijvoorbeeld AtomicLong, AtomicBoolean, AtomicIntegerArray, AtomicReferenceArray, … Deze klassen zijn efficiënt geïmplementeerd, en hebben dus de voorkeur wanneer er enkel gesynchroniseerd moet worden om een primitieve variabele of array aan te passen.

Synchronized

Het gebruiken van een lock is zo alomtegenwoordig dat, in Java, elk object als een lock gebruikt kan worden door middel van het synchronized keyword. Bijvoorbeeld:

class Counter {
    private volatile int count = 0;
    private final Object lock = new Object();

    public void increment() {
        synchronized (lock) {
            count++;
        }
    }

    public void decrement() {
        synchronized (lock) {
            count--;
        }
    }

    public int getCount() {
        return count;
    }
}

Op deze manier beschikt elk Counter-object over een eigen object dat als lock gebruikt wordt. Het synchronized-block geeft aan dat dat lock nodig is om de code in het bijhorende blok uit te voeren, en de lock wordt automatisch terug vrijgegeven op het einde van dat block.

Omdat elk object in Java zo als lock gebruikt kan worden, hoeven we geen apart object aan te maken; we kunnen ook gewoon this gebruiken:

class Counter {
    private volatile int count = 0;

    public void increment() {
        synchronized (this) {
            count++;
        }
    }

    public void decrement() {
        synchronized (this) {
            count--;
        }
    }

    public int getCount() {
        return count;
    }
}

Tenslotte laat Java ook toe om een hele methode als synchronized te definiëren. Dat heeft hetzelfde effect als de code met synchronized(this) hierboven:

class Counter {
    private volatile int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized void decrement() {
        count--;
    }

    public int getCount() {
        return count;
    }
}

Code die gebruik maakt van synchronized heeft ook zichtbaarheids-garanties; je kan het einde van een synchronized-block beschouwen als het schrijven naar een volatile variabele (die hoort bij het object waarop gesynchroniseerd wordt), en het begin ervan als het lezen van die volatile variabele. Alle wijzigingen die gedaan worden voor of in een synchronized-blok of methode zijn dus zichtbaar voor alle threads die nadien een synchronized-blok of methode uitvoeren, tenminste als er gesynchroniseerd wordt op hetzelfde object. In onderstaande code is er dus geen garantie dat thread B de acties van thread A ziet:

Object lock1 = new Object();
Object lock2 = new Object();

// in thread A
synchronized(lock1) {
  // acties van A
}
// later...

// in thread B
synchronized(lock2) {
  // acties van B
}

Deadlocks

Threads die werken met locks kunnen in een deadlock geraken wanneer ze op elkaar wachten. Geen enkele thread kan dan vooruitgang maken. Bijvoorbeeld, in onderstaande code heeft de counter een apart read- en write-lock. Beide threads proberen om de twee locks te verkrijgen, maar in omgekeerde volgorde.

class Counter {
    public final Object readLock = new Object();
    public final Object writeLock = new Object();

    public volatile int value;
}

var counter = new Counter();
var inc = new Thread(() -> {
    for (int i = 0; i < 10_000; i++) {
        synchronized (counter.readLock) {
            synchronized (counter.writeLock) {
                counter.value++;
            }
        }
    }
});
var dec = new Thread(() -> {
    for (int i = 0; i < 10_000; i++) {
        synchronized (counter.writeLock) {
            synchronized (counter.readLock) {
                counter.value--;
            }
        }
    }
});

inc.start(); dec.start();
inc.join(); dec.join();

System.out.println(counter.value);

In sommige gevallen kan dat leiden tot een deadlock, namelijk wanneer een thread onderbroken wordt tussen het verkrijgen van beide locks:

Thread incThread dec
acquire readLock
acquire writeLock
wait (forever) for readLock …
wait (forever) for writeLock…

Er is geen eenvoudige manier om deadlocks te vermijden, behalve de applicatie erg zorgvuldig te ontwerpen. Enkele technieken die hierbij kunnen helpen zijn:

  • niet meer locken dan strikt noodzakelijk
  • locks altijd in een welbepaalde volgorde verkrijgen
  • timeouts gebruiken

Immutability

Wanneer we praten over concurrency, is het een goed idee om ook immutability te vermelden. Een immutable object kan nooit van waarde wijzigen: eens aangemaakt blijven alle waardes hetzelfde. Er kunnen dus per definitie geen race condities optreden wanneer meerdere threads een immutable object gebruiken; ook beschikt elke thread altijd over de laatste waarde (volatile is dus niet nodig). Wanneer mogelijk, gebruik je dus best immutable objecten in een applicatie met concurrency. Records zijn uiterst geschikt om eenvoudig dergelijke immutable objecten te maken, en gaan dus goed samen met concurrency!

Concurrent data structures

Zoals we hierboven gezien hebben, is niet elke klasse automatisch geschikt om (correct) door meerdere threads tegelijk gebruikt te worden. Ook de ingebouwde collectie-types (bv. ArrayList, LinkedList) zijn niet thread-safe. Je kan een thread-safe collectie verkrijgen door hulpfuncties in de Collections-klasse, bijvoorbeeld Collections.synchronizedList(unsafeList). Dat geeft een view op de gegeven collectie terug waarvan alle methodes synchronized zijn. Dat betekent dus dat de collectie slechts door 1 thread tegelijk gebruikt kan worden. Bovendien, wanneer je itereert over de elementen in de collectie, bijvoorbeeld via een iterator of stream (zie later), moet het gebruik van die iterator ook in een sycnhronized blok staan (met de collectie als object), zodat de lijst niet aangepast kan worden tijdens het itereren. Omdat ook een foreach-lus een iterator gebruikt, moet die lus ook in een synchronized-blok staan:

List myList = Collections.synchronizedList(new ArrayList<E>());
...
synchronized(myList) {
  var it = myList.iterator();
  while (it.hasNext()) {
    ...
  }
}
...
synchronized(myList) {
  for (var element : myList) {
    ...
  }
}

Bovenop die manier om ‘gewone’ collecties thread-safe te maken, zijn er ook implementaties die specifiek ontworpen zijn voor concurrency. Bijvoorbeeld, een CopyOnWriteArrayList is een thread-safe lijst, geschikt voor wanneer er veel vaker gelezen wordt uit de lijst dan dat er naar geschreven wordt. Deze implementatie dan is efficiënter dan een gewone ArrayList die gesynchroniseerd wordt via de synchronizedList-hulpfunctie. Zoals de naam zegt wordt de onderliggende array nooit aangepast, maar wel volledig gekopieerd elke keer ernaar geschreven wordt. Het voordeel is dat lezen kan zonder enige synchronizatie, omdat de inhoud van de array zelf nooit meer aangepast wordt.

Er bestaat ook een ConcurrentHashMap, die een thread-safe Map implementeert. Ook hier is deze efficiënter dan een gesynchroniseerde Map, omdat nooit de hele datastructuur gelockt wordt.

High-level concurrency abstractions

Totnogtoe hebben we steeds zelf threads aangemaakt. Zoals eerder vermeld komt elke thread echter met een redelijk grote overhead, en wordt het al snel complex om bij te houden welke threads we hebben en hoeveel er zijn. We willen daarom de gebruikte threads makkelijk kunnen beheren.

Executor

Java biedt enkele hoog-niveau abstracties aan om te werken met threads. De basisklasse daarvoor is een Executor. Deze ontkoppelt de taak die uitgevoerd moet worden van de thread waarin die uitgevoerd wordt. Aan een Executor kan je dus Runnable’s (of lambda’s of method references) geven die uitgevoerd moeten worden, zonder zelf de threads te beheren. Er is ook een subinterface ExecutorService die enkele methodes toevoegt, bijvoorbeeld het afsluiten van een executor. Tenslotte is er de ScheduledExecutorService die methodes toevoegt om taken te plannen (bijvoorbeeld uitvoeren na een bepaalde tijd, eenmalig of herhaald).

Vaak willen we het aantal threads dat gebruikt wordt beperken, bijvoorbeeld tot ongeveer het aantal aanwezige processoren of cores. Dat kan met een ThreadPool: een verzameling van threads die, eenmaal aangemaakt, herbruikt kunnen worden. Er is een ThreadPoolExecutor die exact dat doet. Je kan die zelf aanmaken, maar dat kan makkelijker via hulpmethodes in de Executors klasse. Bijvoorbeeld:

var pool = Executors.newFixedThreadPool(5);
pool.submit(() -> someTask());
pool.submit(() -> someOtherTask());
...
pool.close(); // wacht tot alle taken afgelopen zijn.

maakt een ExecutorService aan met ten hoogste vijf threads. Nieuwe taken die aangeboden worden wanneer alle vijf threads bezig zijn met andere taken, belanden in een wachtrij tot ze aan de beurt zijn. Met de close-methode zorg je ervoor dat er geen nieuwe taken meer kunnen bijkomen, en wacht je tot alle bestaande taken afgelopen zijn.

Er zijn ook andere varianten die je kan aanmaken via de Executors-klasse, bijvoorbeeld

  • newCachedThreadPool: herbruikt threads indien voorhanden, en maakt anders een nieuwe thread aan. Threads zonder werk worden na een bepaalde tijd beëindigd.
  • newSingleThreadExecutor: een executor met slechts één thread die al het aangeboden werk (na elkaar) uitvoert.
  • newSingleThreadScheduledExecutor: zelf de als hierboven, maar dan met een ScheduledExecutor als resultaat. Hiermee kan je dus bijvoorbeeld taken op regelmatige tijdstippen uitvoeren.

Het aanmaken en vervolgens wachten op alle taken kan ook via een zogenaamd try-with-resources statement. Dat is een eenvoudige manier om te wachten op alle taken en ervoor te zorgen dat de pool altijd afgesloten wordt, ook als er exceptions gegooid worden. (Een try-with-resource statement kan in Java ook voor andere zaken gebruikt worden, in het bijzonder met resources die na gebruik terug gesloten moeten worden, bijvoorbeeld een bestand.)

try (var pool = Executors.newFixedThreadPool(5)) {
  pool.submit(() -> someTask());
  pool.submit(() -> someOtherTask());
}
// de pool is hier gesloten, en alle taken zijn uitgevoerd.

Fork-Join pool

Een fork-join pool is een specifiek type Executor voor taken die (recursief) nieuwe subtaken genereren. Deze subtaken worden dan mee opgenomen in de lijst van uit te voeren taken. Een fork-join pool is vooral nuttig wanneer de taken onafhankelijk van elkaar zijn, en achteraf gecombineerd worden. Er wordt gebruik gemaakt van work stealing: threads in de pool die niets meer te doen hebben, kunnen subtaken beginnen uitvoeren die gegenereerd werden door een andere thread.

We gaan hier niet verder in op het gebruik van een fork-join pool.

Oefeningen

Volatile

Is het volatile keyword nog nodig in de implementaties van Counter met een semafoor en/of lock hierboven? Waarom wel/niet?

Synchronized

Waarom zou je niet gewoon elke methode van een klasse synchronized maken?

Ticketverkoop

Maak volgende klasse voor het boeken van tickets thread-safe, zodat meerdere klanten tegelijk tickets kunnen boeken. Je vindt hieronder ook een main-methode die deze klasse gebruikt. Doe eerst het simpelste wat je kan bedenken, en kijk wat het effect is op de uitvoeringstijd. Probeer daarna om de oplossing efficiënter te maken.

public class BookingSystem {

    private final String[] seats = {"A", "B", "C", "D", "E", "F"};
    private int availableTickets = seats.length;
    private final Map<String, String> seatAssignments = new HashMap<>();

    public boolean isAssigned(String seat) {
        return seatAssignments.containsKey(seat);
    }

    private void assignSeat(String seat, String customer) {
        // simulate that creating a ticket takes some time
        try {
            Thread.sleep(100);
        } catch (InterruptedException ignored) {
        }
        seatAssignments.put(seat, customer);
    }

    public List<String> bookSeats(String customer, int requestedSeats) {
        if (requestedSeats > availableTickets) throw new IllegalStateException("Not enough seats available");
        // simulate that preprocessing takes a lot of time
        try {
            Thread.sleep(2000);
        } catch (InterruptedException ignored) {
        }
        List<String> seatsForCustomer = new ArrayList<>();
        int seatIndex = 0;
        while (seatsForCustomer.size() < requestedSeats) {
            String seat = seats[seatIndex];
            if (!isAssigned(seat)) {
                assignSeat(seat, customer);
                seatsForCustomer.add(seat);
            }
            seatIndex++;
        }
        availableTickets -= requestedSeats;
        return seatsForCustomer;
    }
}

Hieronder vind je een main-methode die deze klasse gebruikt. We maken hier gebruik van een CyclicBarrier: een ander synchronisatie-primitief, waarmee je kan wachten tot een aantal threads hetzelfde punt bereikt hebben voor ze verder mogen gaan. We gebruiken dit om de grote toestroom te simuleren bij het openen van de ticketverkoop. Bekijk ook de andere concurrency-aspecten van deze code.

public static void main(String[] args) {
    var nbCustomers = 20;
    var bookingSystem = new BookingSystem();
    var assignedSeats = Collections.synchronizedList(new ArrayList<String>());
    try (var executor = Executors.newFixedThreadPool(nbCustomers)) {
        // wait until all customers are ready: simulates opening ticket sales
        var ticketSalesOpening = new CyclicBarrier(nbCustomers);
        for (int i = 0; i < nbCustomers; i++) {
            final var customer = "Customer %02d".formatted(i);
            executor.execute(() -> {
                try {
                    ticketSalesOpening.await();
                    var seats = bookingSystem.bookSeats(customer, 1);
                    System.out.println(customer + ": " + seats);
                    assignedSeats.addAll(seats);
                } catch (IllegalStateException e) {
                    // no tickets left for customer
                } catch (BrokenBarrierException | InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    var uniqueSeats = new HashSet<>(assignedSeats);
    System.out.println((assignedSeats.size() - uniqueSeats.size()) + " overbookings.");
}

Thread-safe cache

Maak een thread-safe Downloader-klasse die intern een (in-memory) cache gebruikt. Als de te downloaden URL nog niet in de cache zit, wordt die gedownload. Anders wordt de inhoud uit de cache teruggegeven. In plaats van een echte URL te downloaden kan je gewoon Thread.sleep gebruiken:

public static String download(String url) {
    try {
        System.out.println("Downloading " + url + " (" + Thread.currentThread() + ")");
        Thread.sleep(1000); // 1 seconde
        return "Contents of " + url;
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Maak ook een client met 4 threads die 100 keer een random URL opvragen (gebruik een thread pool). In plaats van echte URL’s kan je gewoon strings gebruiken:

public static final String[] urls = { "abc", "def", "ghi", "jkl", "mno", "pqr", "stu", "vwx", "yz" };
public static String randomURL() {
    return urls[new Random().nextInt(urls.length)];
}

Zorg ervoor dat dezelfde URL nooit twee keer gedownload wordt door twee verschillende threads.

Hoelang verwacht je dat dit zal duren? Klopt dat met wat er gebeurt?

Uitbreiding: voeg een cleanup-thread toe aan je Downloader-klasse. Deze maakt elke 5 seconden de cache helemaal leeg. Gebruik hiervoor een ScheduledExecutor. Opgelet: deze thread kan er mogelijk voor zorgen dat je programma niet meer stopt - zorg dat je de cleanup-thread ook beëindigt.