Run parallel tasks in Java or Android, and get callback when all completes

What we are going to discuss today?

We will write a Java program that executes several tasks in parallel and then sends a callback when all of those tasks completes.

Some Use Cases

  • Android: Synchronize app’s data with the server through APIs and remove the loader when all completes.
  • Backend: Save data in database, file logs, and cache. Send response on all the task completion.

Process

  • A process is an instance of a computer program that is being executed.
  • When an Android application starts, the Android system starts a new Linux process for the application with a single thread of execution called the main thread.

Threads

  • A program can contains two or more parts that can run in parallel. Each part of such a program is called a thread.
  • They help in utilizing the multicore processors and also reduce the ideal CPU time of a single processor.
  • Creating too many threads slows down the execution
  • Only a few threads run in parallel, others wait for the CPU to get free.

Creating Java Threads

Thread subclass

public class MyThread extends Thread {

    public void run() {
        System.out.println("MyThread running");
    }
}
MyThread myThread = new MyThread();
myTread.start();

Implement Runnable

public class MyRunnable implements Runnable {

    public void run(){
        System.out.println("MyRunnable running");
    }
}
Thread thread = new Thread(new MyRunnable());
thread.start();

Build the programming model

Program Components

  • Task: It is a Runnable that will contains the code to be executed in a separate thread.
  • Worker: It will be responsible for creating a thread and running the supplied task.
  • Executor: It will create workers to handle the tasks. It will also be responsible for broadcasting the completion of all the tasks.
You can find the complete code here: https://ideone.com/RuWabo
Copy and paste this code here: https://ideone.com choose Java from language and hit run.

Build Locally

  • Open terminal run following:
  • mkdir ~/Desktop/mindorks-multithreading
  • cd ~/Desktop/mindorks-multithreading
  • gradle init --type java-library
  • IntelliJ IDEA > Open > Desktop > mindorks-multithreading
  • Press OK
  • Create class: mindorks-multithreading/src/main/java/Main
  • Select Main and right click and Select run Main.main()

Complete Program

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class Main {

    public static void main(String[] args) throws Exception {
        AtomicBoolean processing = new AtomicBoolean(true);

        new Executor.Builder()
                .add(() -> {
                    System.out.println("TASK 1 Start");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("TASK 1 Complete");
                })
                .add(() -> {
                    System.out.println("TASK 2 Start");
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("TASK 2 Complete");
                })
                .add(() -> {
                    System.out.println("TASK 3 Start");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("TASK 3 Complete");
                })
                .callback(() -> {
                    System.out.println("All TASK COMPLETED");
                    processing.set(false);
                })
                .build()
                .execute();

        while (processing.get()) {
            // program runs continuously
        }
        System.out.println("Program Terminates");
    }

    public static class Executor extends Thread {
        private ConcurrentLinkedQueue<Worker> workers;
        private Callback callback;
        private CountDownLatch latch;

        private Executor(List<Runnable> tasks, Callback callback) {
            super();
            this.callback = callback;
            workers = new ConcurrentLinkedQueue<>();
            latch = new CountDownLatch(tasks.size());

            for (Runnable task : tasks) {
                workers.add(new Worker(task, latch));
            }
        }

        public void execute() {
            start();
        }

        @Override
        public void run() {
            while (true) {
                Worker worker = workers.poll();
                if (worker == null) {
                    break;
                }
                worker.start();
            }

            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (callback != null) {
                callback.onComplete();
            }
        }

        public static class Builder {
            private List<Runnable> tasks = new ArrayList<>();
            private Callback callback;

            public Builder add(Runnable task) {
                tasks.add(task);
                return this;
            }

            public Builder callback(Callback callback) {
                this.callback = callback;
                return this;
            }

            public Executor build() {
                return new Executor(tasks, callback);
            }
        }

        public interface Callback {
            void onComplete();
        }
    }

    public static class Worker implements Runnable {

        private AtomicBoolean started;
        private Runnable task;
        private Thread thread;
        private CountDownLatch latch;

        public Worker(Runnable task, CountDownLatch latch) {
            this.latch = latch;
            this.task = task;
            started = new AtomicBoolean(false);
            thread = new Thread(this);
        }

        public void start() {
            if (!started.getAndSet(true)) {
                thread.start();
            }
        }

        @Override
        public void run() {
            task.run();
            latch.countDown();
        }
    }
}

Next Devlog

  • Callables, Future and FutureTask: They help in passing the result to the caller thread when the worker thread finishes.
  • ThreadPoolExecutor: Execute large number of tasks using a pool of threads.
  • Monitor and Locks
See you next time