Slide show

Threads and Java

Ogg-Vorbis format, 52Mbytes WAV format, 290Mbytes

Distributed systems and concurrent systems

Concurrency

Unix processes

Scheduling policies

Threads

Shared memory problems

Threads

Thread scheduling

Thread creation

Two methods

Example


import java.io.*;

public class SimpleThread extends Thread {

    protected int base;

    static public void main(String[] argv) {
	for (int n = 0; n < 3; n++) {
	    new SimpleThread(n).start();
	}
    }

    public SimpleThread(int base) {
	this.base = base;
    }

    public void run() {
	int count = 0;
	while (count < 5) {
	    System.out.println("Thread " + base + " loop " + count);
	    count++;
	    try {
		sleep((base + 1) * 1000);
	    } catch(InterruptedException e) {
		e.printStackTrace();
	    }
	}
	System.out.println("Thread " + base + " finished");
    }
}

Sequential server

while (msg = getMessage())
    handle msg

Concurrent server

while (msg = getMessage())
    create new thread to handle msg

Thread life-cycle

Thread alive states

Example broken shared memory


/**
 * @version 1.20 1999-04-26
 * @author Cay Horstmann
 */

public class UnsynchBankTest
{  public static void main(String[] args)
   {  Bank b = new Bank(NACCOUNTS, INITIAL_BALANCE);
      int i;
      for (i = 0; i < NACCOUNTS; i++)
      {  TransferThread t = new TransferThread(b, i,
            INITIAL_BALANCE);
         t.setPriority(Thread.NORM_PRIORITY + i % 2);
         t.start();
      }
   }

   public static final int NACCOUNTS = 10;
   public static final int INITIAL_BALANCE = 10000;
}

class Bank
{  public Bank(int n, int initialBalance)
   {  accounts = new int[n];
      int i;
      for (i = 0; i < accounts.length; i++)
         accounts[i] = initialBalance;
      ntransacts = 0;
   }

   public void transfer(int from, int to, int amount)
   {  if (accounts[from] < amount) return;
      accounts[from] -= amount;

      // let the O/S run another thread 
      Thread.currentThread().yield();

      accounts[to] += amount;
      ntransacts++;
      if (ntransacts % NTEST == 0) test();
   }

   public void test()
   {  int sum = 0;

      for (int i = 0; i < accounts.length; i++)
         sum += accounts[i];

      System.out.println("Transactions:" + ntransacts
         + " Sum: " + sum);
   }

   public int size()
   {  return accounts.length;
   }

   public static final int NTEST = 10;
   private int[] accounts;
   private long ntransacts = 0;
}

class TransferThread extends Thread
{  public TransferThread(Bank b, int from, int max)
   {  bank = b;
      fromAccount = from;
      maxAmount = max;
   }

   public void run()
   {  try
      {  while (!interrupted())
         {  int toAccount = (int)(bank.size() * Math.random());
            int amount = (int)(maxAmount * Math.random());
            bank.transfer(fromAccount, toAccount, amount);
            sleep(1);
         }
      }
      catch(InterruptedException e) {}
   }

   private Bank bank;
   private int fromAccount;
   private int maxAmount;
}

Mutual exclusion

Use of synchronized

Fixing bank account

More synchronization

Synchronization on a shared object


/** 
 * This is a hard way of doing arithmetic:
 * fire off a separate thread for each sum
 * and then synchronize them on a shared object
 * to avoid  contention
 */ 

public class Adder extends Thread {

    // static fields are shared by all instances
    // of the Adder class i.e. one field for
    // all instances
    private static Object synchObject;
    private static int sum = 0;

    // non-static fields belong to individual
    // instances of each Adder object i.e.
    // separate fields for each instance
    private int increment;

    public static void main(String[] args) {
	Adder a1 = new Adder(1);
	Adder a3 = new Adder(3);

	// this could have been set to a1 or a3
	synchObject = new Object();

	// the following should do
	//   sum += 1
	//   sum += 3
	// in an indeterminate order
	a1.start();
	a3.start();
	try {
	    // sleep for long enough for all threads
	    // to complete (hopefully)
	    sleep(1000);
	} catch(InterruptedException e) {
	    // ignore
	}
	// now all threads should have finished
	System.out.println("Sum is " + sum);
    }

    public Adder(int increment) {
	this.increment = increment;
    }

    public void run() {
	synchronized(synchObject) {
	    sum += increment;
	}
    }
}

Fine control of threads

There are a number of ways in which threads can control execution

Wait/notify

Reader/Writer example


import java.io.*;

/**
 * This class copies strings from one thread to another.
 * The class ReaderWriter has a field sharedStr that is
 * set by Writer and printed by Reader. The Writer sets a 
 * value, wakes up the Reader. The Writer then waits till 
 * the Reader is finished and then writes another value.
 * The Reader waits till it is woken and then prints the
 * value. It then wakes up the Writer and waits again.
 * The loops terminate when the Writer writes an empty string ""
 */
public class ReaderWriter {
    public static String sharedStr;
    public static Object synchObject;
 
    public static void main(String[] args) {
	synchObject = new Object();
	new Reader().start();
	new Writer().start();
    }
}

class Writer extends Thread {
    private String[] lines = {"line 1",
			      "line 2",
			      "line 3",
			      "line 4",
			      ""        // terminating string
    };

    public void run() {
	for (int n = 0; n  < lines.length; n++) {
	    // synch on the shared object
	    synchronized(ReaderWriter.synchObject) {
		System.out.println("Writer assigning to string");
		ReaderWriter.sharedStr = lines[n];

		// another thread synch'ed on 
		// the synchObject can wake up
		System.out.println("Waking up reader");
		ReaderWriter.synchObject.notifyAll();

		// and we wait until we are
		// woken before continuing
		try {
		    System.out.println("Writer waiting");
		    ReaderWriter.synchObject.wait();
		} catch(InterruptedException e) {
		    // ignore
		}
	    }
	}
    }
}

class Reader extends Thread {

    public void run() {
	while (true) {
	    synchronized(ReaderWriter.synchObject) {
		// wait until woken before trying
		// to print anything
		try {
		    System.out.println("Reader waiting");
		    ReaderWriter.synchObject.wait();
		} catch(InterruptedException e) {
		    // ignore
		}

		if (ReaderWriter.sharedStr.equals("")) {
		    // end of loop. 
		    // wakeup anyone else before breaking
		    // out of loop
		    ReaderWriter.synchObject.notifyAll();
		    break;
		}

		System.out.println(ReaderWriter.sharedStr);
		
		// and then tell another thread to wakeup
		ReaderWriter.synchObject.notifyAll();
	    }
	}
    }
}

Deadlock

Example:

Two processes wish to communicate with each other, in a 2-way manner. Two pipes can be used for this:

The two processes issue read/write requests P1: P2: read(pipe2) read(pipe1) write(pipe1) write(pipe2) Deadlock occurs.

Example:

When the "give way to the right" rule was in force, this was a common situation:

Deadlock detection

A resource allocation graph is a graph showing processes, resources and the connections between them. Processes are modelled by circles, resources by squares. If a process controls a resource then there is an arrow from resource to process. If a process requests a resource an arrow is shown the other way.

For example, P1 is using R1 and has requested R2, while P2 is using R2.

Deadlock can now occur if P2 requests R1 - setting up a cycle.

The total set of conditions for deadlock to occur are:

  1. Mutual exclusion. Each resource is either currently assigned to one process or is available.
  2. Hold and wait. Processes holding resources can request new ones.
  3. No preemption. Resources granted cannot be taken away, but must be released by the process holding them.
  4. Circular wait. There must be a cycle in the resource allocation graph.

Deadlock and the bank account

Deadlock prevention

To prevent deadlocks from occurring, one of the four conditions must be disallowed.

  1. Mutual exclusion. Make some resources unsharable, such as printers, tape drives.
  2. Hold and wait. Process must request all needed resources at one time. This results in wasted resources.

    Process must release all current resources before requesting more. This is not feasible.

  3. No Preemption. Make it possible for the O/S to make a process give up a resource. This may result in lost data.
  4. Circular wait. Make it impossible for cycles to occur in the graph by restricting the types of graphs that can be made. For example, give each resource a priority number. If a resource is held with a priority, then a new resource can only be requested of lower priority.

Bank account avoiding circular wait

Deadlock avoidance

Deadlock prevention is to ensure that deadlocks never occur. Deadlock avoidance is attempting to ensure that resources are never allocated in a way that might cause deadlock.

There are situations that may give rise to cycles, but in practise will not lead to deadlock. For example, P1 and P2 both want R1 and R2, but in this manner:


P1 can be given R1 and P2 can be given R2. P1 will require R2 before it can give up R1, but that is ok - P2 will give up R2 before it needs R1.

The Banker's algorithm can be used to ensure this, but it is too complicated for general use.

Listing a directory (and subdirectories)

Listing all the files in a directory is a recursive operation:

This can be done synchronously using one thread or concurrently using one thread per directory

Recursive single-thread solution


import java.io.*;

public class ListFiles {

    public static void main(String[] args) {
	ListFiles lister = new ListFiles(new File(args[0]));
    }

    public ListFiles(File dir) {
	listDirectory(dir);
    }

    public void listDirectory(File dir) {
	System.out.println(dir.getAbsolutePath());

	File[] files = dir.listFiles();
	if (files == null) {
	    return;
	}

	for (int n = 0; n < files.length; n++) {
	    File file = files[n];
	    if (! file.isDirectory()) {
		System.out.println(file.getAbsolutePath());
	    } else {
		listDirectory(file);
	    }
	}
    }
}

Multi-threaded solution

Each directory could be handled by a separate thread. When a list of files is found for each directory, it's name is printed if it is an ordinary file, but if it is a directory then a new thread is created to handle it.


import java.io.*;

public class ListFilesThreads extends Thread {

    private static Object synch;
    private static ThreadGroup group;

    private File dir;

    public static void main(String[] args) {
	ListFilesThreads lister = new ListFilesThreads(new File(args[0]));

	lister.start();
    }

    public ListFilesThreads(File dir) {
	this.dir = dir;
    }

    public void run() {
	listDirectory(dir);
    }

    public void listDirectory(File dir) {
	System.out.println(dir.getAbsolutePath());
	File[] files = dir.listFiles();
	if (files == null) {
	    return;
	}
	for (int n = 0; n < files.length; n++) {
	    File file = files[n];
	    if (! file.isDirectory()) {
		System.out.println(file.getAbsolutePath());
	    } else {
		new ListFilesThreads(file).start();
	    }
	}
    }
}

Summing the size of files in directories

Listing files can be made more complex by finding their size and summing them.

Recursive sum


import java.io.*;

public class SumFiles {

    private long sum = 0;

    public static void main(String[] args) {
	SumFiles lister = new SumFiles(new File(args[0]));
    }

    public SumFiles(File dir) {
	sumDirectory(dir);
	System.out.println("Sum is " + sum);
    }

    public void sumDirectory(File dir) {
	File[] files = dir.listFiles();

	if (files == null) {
	    return;
	}

	for (int n = 0; n < files.length; n++) {
	    File file = files[n];

	    if (! file.isDirectory()) {
		//System.out.println(file.length() + " " + file.toString());
		System.out.println(file.length() + " " + sum);
		sum += file.length();
	    } else {
		sumDirectory(file);
	    }
	}
    }
}

Multi-threaded sum

This one is a lot more complicated!


import java.io.*;

public class SumFilesThreads extends Thread {

    private static long sum = 0;
    private static long threadCount = 1;
    private static Object synch;

    private File dir;

    public static void main(String[] args) {
	SumFilesThreads lister = new SumFilesThreads(new File(args[0]));

	// synchronize on the first object created
	synch = lister;
	lister.start();
	synchronized(synch) {
	    while (true) {
		try {
		    // every time a thread dies, it should
		    // notify all others, so we will be woken
		    // up periodically
		    synch.wait();
		} catch(InterruptedException e) {
		    System.out.println("Interrupted " + e);
		}
		if (threadCount == 0) {
		    System.out.println("Sum " + sum);
		    System.exit(0);
		}
	    }
	}
    }

    public SumFilesThreads(File dir) {
	this.dir = dir;
    }

    public void run() {
	listDirectory(dir);
    }

    public void listDirectory(File dir) {
	if (! dir.isDirectory()) return;
	
	// System.out.println(dir.getAbsolutePath());
	File[] files = dir.listFiles();
	for (int n = 0; n < files.length; n++) {
	    File file = files[n];
	    if (! file.isDirectory()) {
		// make access to sum atomic
		synchronized(synch) {
		    sum += file.length();
		}
	    } else {
		synchronized(synch) {
		    // make access to threadCount atomic
		    threadCount++;
		}
		new SumFilesThreads(file).start();
	    }
	}

	synchronized(synch) {
	    // this thread is finished, wake up a 
	    // thread waiting on the synch object
	    threadCount--;
	    synch.notifyAll();
	}
    }
}

Conclusion


Jan Newmarch (http://jan.newmarch.name)
jan@newmarch.name
Last modified: Mon Aug 15 12:12:36 EST 2005
Copyright ©Jan Newmarch
Copyright © Jan Newmarch, Monash University, 2007
Creative Commons License This work is licensed under a Creative Commons License
The moral right of Jan Newmarch to be identified as the author of this page has been asserted.