Java & Android Multithreaded Programming: Runnable, Callable, Future, Executor

Lesson Overview

  • Learn Runnable, Callable, Future and ThreadPoolExecutor
  • Setup Java gradle project using IntelliJ IDEA CE IDE
  • Design and build async model for user activities feed.
  • Learn through code implementation.
  • Prerequisite: Java Basics and Java Lambda Basics

Project Setup

  • Install gradle: https://gradle.org/install
  • Download IntelliJ IDEA CE: https://www.jetbrains.com/idea
  • Open terminal run following:
  • mkdir ~/Desktop/mindorks-multithreading-2
  • cd ~/Desktop/mindorks-multithreading-2
  • gradle init --type java-library
  • IntelliJ IDEA > Open > Desktop > mindorks-multithreading
  • OK
  • Create classes: App with main method
  • Select App and right click and select run App.main()

Problem Overview

  • Building a user feed with list of activities in the order they happened.
  • Likes, posts, comments, new friends are fetched through separate web APIs concurrently.
  • When all the APIs result then the results are merged in a single sorted by date list.
  • The result is returned to the caller through callback

Model Building Blocks

  • Java utils concurrent package provide the tools to build up this model.
  • ThreadPoolExecutor, Callable, Future

ThreadPoolExecutor

  • It implement ExecutorService.
  • It helps in using a fixed number of thread is running tasks and reusing the threads that has been done with the assigned task. This reduces the thread creation overhead.
  • Create an ThreadPoolExecutor with a fixed number of threads in the pool using Executors.newFixedThreadPool method.
  • Call shutdown method to stop all threads and release the resources.

Callable

  • This is an interface used to return the result from the executing thread back to the invoking thread.
  • call method is used to perform the task and then return the result or throw an exception.
  • Callable task is run by the ExecutorService by calling its submit method.
  • To run a Runnable task in ExecutorService we call its execute method.

Future

  • A Callable task’s result is returned by the Future.
  • We get the result from the Future using its get method.
  • The thread that calls the get method of the Future waits till the result is returned by the Callable after execution of its call method.

Program Code

public interface Activity {

    Date getCreatedAt();
}
public class Comment implements Activity{

    private Date createAt;

    public Comment(Date createAt) {
        this.createAt = createAt;
    }

    @Override
    public Date getCreatedAt() {
        return createAt;
    }

    @Override
    public String toString() {
        return "Comment{" +
                "createAt=" + createAt +
                '}';
    }
}

public class Friend implements Activity{

    private Date createAt;

    public Friend(Date createAt) {
        this.createAt = createAt;
    }

    @Override
    public Date getCreatedAt() {
        return createAt;
    }

    @Override
    public String toString() {
        return "Friend{" +
                "createAt=" + createAt +
                '}';
    }
}

public class Like implements Activity{

    private Date createAt;

    public Like(Date createAt) {
        this.createAt = createAt;
    }

    @Override
    public Date getCreatedAt() {
        return createAt;
    }

    @Override
    public String toString() {
        return "Like{" +
                "createAt=" + createAt +
                '}';
    }
}

public class Post implements Activity{

    private Date createAt;

    public Post(Date createAt) {
        this.createAt = createAt;
    }

    @Override
    public Date getCreatedAt() {
        return createAt;
    }

    @Override
    public String toString() {
        return "Post{" +
                "createAt=" + createAt +
                '}';
    }
}

public interface ResultCallback {

    void onResult(List<Activity> activities);
}

public class RemoteService {

    private static int cores = Runtime.getRuntime().availableProcessors();
    private static ExecutorService executor = Executors.newFixedThreadPool(cores + 1);

    public void stop() {
        executor.shutdown();
    }

    public void getUserRecentActivities(ResultCallback callback) {
        executor.execute(() -> {
            List<Like> likes = new ArrayList<>();
            List<Post> posts = new ArrayList<>();
            List<Comment> comments = new ArrayList<>();
            List<Friend> friends = new ArrayList<>();

            Future<List<Like>> futureLikes = executor.submit(getLikes("https://dummy.com/likes"));
            Future<List<Comment>> futureComments = executor.submit(getComments("https://dummy.com/comments"));
            Future<List<Post>> futurePosts = executor.submit(getPosts("https://dummy.com/posts"));
            Future<List<Friend>> futureFriends = executor.submit(getFriends("https://dummy.com/friends"));

            try {
                likes = futureLikes.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

            try {
                posts = futurePosts.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

            try {
                comments = futureComments.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

            try {
                friends = futureFriends.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }

            List<Activity> activities = new ArrayList<>();
            activities.addAll(likes);
            activities.addAll(posts);
            activities.addAll(comments);
            activities.addAll(friends);

            Collections.sort(activities,
                    (activity1, activity2) -> activity1.getCreatedAt().compareTo(activity2.getCreatedAt()));

            callback.onResult(activities);
        });

    }

    private Callable<List<Like>> getLikes(String url) {
        return () -> {
            System.out.println("getLikes");
            Thread.sleep(2000);
            return Arrays.asList(new Like(new Date(1535977248560L)), new Like(new Date(1535977246960L)));
        };
    }

    private Callable<List<Post>> getPosts(String url) {
        return () -> {
            System.out.println("getPosts");
            Thread.sleep(1000);
            return Arrays.asList(new Post(new Date(1535978248560L)), new Post(new Date(1535977246460L)));
        };
    }

    private Callable<List<Comment>> getComments(String url) {
        return () -> {
            System.out.println("getComments");
            Thread.sleep(2500);
            return Arrays.asList(new Comment(new Date(1532978248560L)), new Comment(new Date(1515977246460L)));
        };
    }

    private Callable<List<Friend>> getFriends(String url) {
        return () -> {
            System.out.println("getFriends");
            Thread.sleep(800);
            return Arrays.asList(new Friend(new Date(1531378248560L)), new Friend(new Date(1535998246460L)));
        };
    }
}

public class App {

    public static void main(String[] args) {
        System.out.println("Program Started");
        AtomicBoolean processing = new AtomicBoolean(true);

        RemoteService service = new RemoteService();
        service.getUserRecentActivities(activities -> {
            for (Activity activity : activities) {
                System.out.println(activity);
            }
            processing.set(false);
        });

        while (processing.get()){
            // keep running
        }
        service.stop();
        System.out.println("Program Terminated");
    }
}