In this post I go over the basis of concurrent execution in Java. I advise you to run the code snippets yourself to get the most out of it. This is an overview, not a tutorial. If you are able to explain the working of the example on the bottom yourself, you have a good basic understanding of how concurrency works.
Warmup
Threads, definitely not threats.
Let’s start simple. You can build a Thread in two ways. By implementing the Runnable interface or by extending the Thread class. Starting the thread is done with the start() method of Thread.
class Main{ public static void main(String[] args) { System.out.println("Begin"); Thread th1 = new Thread(new T1()); Thread th2 = new T2(); th1.start(); th2.start(); System.out.println("End"); } } class T1 implements Runnable { public void run() { for (int i = 0; i < 100; i++) { System.out.println("Hello from T1 " + i); } } } class T2 extends Thread { @Override public void run() { for (int i = 0; i < 100; i++) { System.out.println("Hello from T2: " + i); } } }
When executing the above code snippet you will see that it prints “End” before all hello messages have appeared on the screen. This is because the main thread already reaches its end while the others are still running.
ExecutorService
An
Executor
that provides methods to manage termination and methods that can produce aFuture
for tracking progress of one or more asynchronous tasks. AnExecutorService
can be shut down, which will cause it to reject new tasks. (from: oracle java docs)
Here a simple ExecutorService with just one Thread in its pool:
public class Executor1 { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); try { System.out.println("Start"); executorService.execute(() -> System.out.println("Executing first task")); executorService.execute(() -> { System.out.println("Executing next task"); int sum = IntStream.rangeClosed(0, 10).sum(); System.out.println("Result: " + sum); }); } finally { if(executorService != null){ executorService.shutdown(); } } } }
Prints:
Start Executing first task Executing next task Result: 55
Now, of course here we are not utilising any advantage of concurrent execution because we only have one thread. I’ll show you that later on.
First something more about Runnable
and Callable<T>
.
Runnable and Callable<T>
Both are functional interfaces with just one method resp, run() and call(). Both methods take no method arguments. However, ‘run()’ does return void and ‘call()’ will return an object of the generic type. Also important to remember is that call() can throw a checked exception while run() cannot.
execute() and submit()
Execute takes a Runnable and return void. Watch the method definition:
void execute(Runnable command)
So you can use execute if you are not interested in the result.
More common however is to use the submit() method which is overloaded multiple times in the ExecutorService interface:
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
In the second one you specify the result that the future will return when execution is ready.
I will focus on the first and last implementation of the submit method.
First things first, what is a future?
A Future represents the future result of an async execution. A result that you can ‘get’ when the execution is ready.
That is basically the whole concept of Future to keep in mind. Something that you declare now, send off, and of which you can get the result when it is ready.
Back to our submit() method. Let’s see two examples of an ExecutorService.submit(). To avoid extra complexity I keep the ExecutorService single threaded here. The first one is taking a Runnable as an input parameter, the second one takes a Callable:
public class FutuurR { static int count = 0; public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); try { Future<?> result = executorService.submit(() -> { count = IntStream.rangeClosed(0, 1000000).sum(); }); // get will return the result of the Runnable method // run() returns void, so the get() method returns null result.get(100, TimeUnit.MILLISECONDS); System.out.println("Reached end of try block"); System.out.println("Count is " + count); } catch (TimeoutException e) { System.out.println("Execution timed out"); } finally { if (executorService != null) { System.out.println("Shutting down service"); executorService.shutdown(); } } } }
public class FutuurC { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newSingleThreadExecutor(); int count = 0; try { Future<Integer> result = executorService.submit(() -> IntStream.rangeClosed(0, 1000000).sum()); // get will return the result of the Callable method // call() return Integer here, so the get() method returns Integer count = result.get(1, TimeUnit.SECONDS); System.out.println("Reached end of try block"); System.out.println("Count is " + count); } catch (TimeoutException e) { System.out.println("Execution timed out"); } finally { if (executorService != null) { System.out.println("Shutting down service"); executorService.shutdown(); } } } }
As you can see in the first example we are not returning anything from our submit method. Hence, the compiler knows that is a Runnable since the run()
method returns void. That’s why the submit method itself returns a Future<?>
with no outcome wrapped inside of it after getting the result.
In the second example we are actually returning the sum of an IntStream. Since we are returning something the compiler knows we are using a Callable<T>
. Since the call()
method of the Callable is returning an int here, the submit()
method returns a Future<Integer>
. This represents that after completion of this future, the result will be an Integer.
I’d also recommend using the second way if you implement something similar. In the first example we were changing the value of thestatic int count
variable. Which is seen as a side effect and not-done.
In both examples you see that we call the get()
method on the Future
object to get the result of the task.
FixedThreadPool
Now let’s see one last example of an ExecutorService. I will use an executor with a fixed number of threads in its pool.
In the example below we are executing two totally unrelated tasks on the ExecutorService. On the one hand I am calculating the sum of all numbers between zero and a million. On the other hand I am just sleeping for a while and telling you that I am sleeping. I added delays to simulate possible network latency in a real world application where you’ll be executing more difficult tasks or accessing other resources.
The example shows you how the 3! threads (2 worker threads and 1 main) are working concurrently (finally I’ve said it!).
public class UseFixedThreadPool { public static void main(String[] args) throws ExecutionException, InterruptedException { int count = 0; ExecutorService executorService = null; try { executorService = Executors.newFixedThreadPool(2); Future<Integer> result = executorService.submit(() -> { System.out.println(Thread.currentThread().getName() + ": Counting.."); TimeUnit.SECONDS.sleep(1); int value = IntStream.rangeClosed(0, 1000000).sum(); System.out.println(Thread.currentThread().getName() + ": Counted: " + value); return value; }); executorService.submit(() -> { for (int i = 0; i < 5; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(Thread.currentThread().getName() + ": Sleeping.."); } return null; }); count = result.get(); // We are not waiting for the second worker thread to complete, thus the shutting down will start while // this task is still executing System.out.println("Reached end of try block"); System.out.println(Thread.currentThread().getName() + ": Count is " + count); } finally { if (executorService != null) { System.out.println("Shutting down service"); // Executor will wait to terminate till all running threads are finished executorService.shutdown(); } } if (executorService != null) { while (!executorService.isTerminated()) { System.out.println("Still tasks running"); executorService.awaitTermination(1, TimeUnit.SECONDS); } System.out.println("All tasks done"); } } }
This gives me the following output (! on your computer the order might be different):
pool-1-thread-1: Counting.. pool-1-thread-2: Sleeping.. pool-1-thread-1: Counted: 1784293664 Reached end of try block main: Count is 1784293664 Shutting down service Still tasks running pool-1-thread-2: Sleeping.. Still tasks running pool-1-thread-2: Sleeping.. Still tasks running pool-1-thread-2: Sleeping.. Still tasks running pool-1-thread-2: Sleeping.. All tasks done
Now, there are a couple of things to say about this output:
- The result of the first submit is stored in the local
Future<Integer>
value - The result of the second submit is not stored.
- In the main thread we do a
get()
on theFuture<Integer>
result variable. This causes the main thread to wait at this point till the result is returned. - Twice I am submitting a
Callable<T>
to the ExecutorService - First we see that the Counting task was picked up by thread-1 of our pool. This thread now reaches the sleep statement and goes to bed for one second
- Concurrently the other task was picked up by thread-2 of the pool
- The main thread is waiting at
count=result.get();
for the first task to return a result. - We see that tread-1 finishes its counting and prints Counted: 1784293664
- The main thread gets the result and can continue passed the
get()
statement - The main thread reaches the end of the try block and prints out the resulting count
- The main thread enters the finally block and starts the
shutdown()
of the ExecutorService. - The sleeping task continues to execute
- The main thread waits till the tasks of the ExecutorService are complete before terminating the executor.
- The sleeping task finishes
- The main threads terminates the executor, prints All tasks done and the program ends.
Note that:
- In the sleeping task that I submit to the ExecutorService I end with return null. This statement is here to tell the compiler that I am using the submit method with a
Callable<T>
as argument and not with aRunnable
. This is necessary because thesleep()
method declares that it throws a checked exception. Throwing a checked exception is only allowed for a Callable and not for a Runnable. (see: explanation of Runnable vs Callable<T>). I could also catch the exception in the lambda but I’d like to stay away from catching exceptions in a Lambda block.
I’ll stop here for today an tell you more about Concurrency, volatile, synchronized, fork/join framework next time!