最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

java - How to implement thread pool without ExecutorService - Stack Overflow

programmeradmin0浏览0评论

I am trying to work on some understanding on thread pools and am trying to implement one by myself without using the ExecutorService. But getting into waiting state for some reason and I am not sure what is causing it. Here is the implementation that I have.

I have a Pool class which is responsible for creating the worker threads and keeping it ready for jobs that are coming in. I will initialize those in my constructor. Also add those to the queue of available threads.availableQueue this is a BlockingQueue<Executor> of Executor which is an inner class inside the Pool class

    private void create_workers(int size) {
            for (int i = 0; i< size; i++){
                executors[i] = new Executor("Executor :: " + i);
                availableQueue.offer(executors[i]);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

The client code will call get the executor and invoke the job like this

Pool threadPool = new Pool(5);
        for (int i=0;i<10;i++){
            threadPool.execute(new Job("Job : " + i));
        }

Job class is a simple Runnable class to mimic a job

execute method will add the job to its jobQueue and wait if any executors are available or not, if its available then get the executor and invoke the job to execute it. After completion it will put a new Executor in the available queue. Reason for this is after the completion the thread is going into TERMINATED state and could not get access to the same thread. Not sure if there is a way to repurpose the same thread.. Help needed here if it's possible.

public void execute(Job job){
        LOGGER.log(Level.INFO, "Job added to the queue :: ");
        jobQueue.offer(job);

        while(Pool.isExecutorsAvailable()){
            Executor t = getExecutor();
            if (t.getState().name().equals("NEW")){
                t.start();
            }

            wait_for_completion(t);
            availableQueue.offer(new Executor(t.name));
        }

}

The actual problem is that after the first job the code is going on infinite waiting state at while(Pool.isExecutorsAvailable()) I could not identify the problem or not sure whats causing the issue.

The code for isExecutorsAvailable is here

   public static boolean isExecutorsAvailable(){
        if(jobQueue.isEmpty()) {
            try {
                LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
                executorLock.wait();
            }
            catch (IllegalMonitorStateException e) {            }
            catch (InterruptedException e) {}
        }
//        executorLock.notify();
        return true;
    }

Any help on this is much appreciated

Edit:

Added all the code snippets

//Client

package com.java.pool;

public class Client {

    public static void main(String[] args) {
        Client m = new Client();
        Pool threadPool = new Pool(5);
        for (int i=0;i<10;i++){
            threadPool.execute(new Job("Job : " + i));
        }
    }

}

//Job
package com.java.pool;


import java.text.SimpleDateFormat;
import java.util.*;


public class Job implements Runnable{

    String name;

    public Job(String name){
        this.name = name;
    }

    public void print(String message){
        System.out.println(message);
    }

    public void run(){
        String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss").format(new java.util.Date());
        print(Thread.currentThread().getName() + " Executor picked the job at : " + timeStamp);
        work_with_time();
        print("Job " + this.name + " Completed at : " + timeStamp);
    }

    private void work_with_time() {
        Random r = new Random();
        int executionTime = r.nextInt(5000);
        try {
            Thread.sleep(executionTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

//Pool

package com.java.pool;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class Pool {

    private static final Logger LOGGER = Logger.getLogger(Pool.class.getName());

    int size;
    BlockingQueue<Executor> availableQueue;
    Executor[] executors;

    Object availableLock = new Object();
    static Object executorLock = new Object();
    static BlockingQueue<Job> jobQueue;


    public Pool(int size){
        this.size = size;
        availableQueue = new LinkedBlockingQueue<>();
        jobQueue = new LinkedBlockingQueue<>();

        LOGGER.log(Level.INFO, "All Internal Queue's Initialized.");
        LOGGER.log(Level.INFO, "Pool of size :: " + size + " Created.");

        executors = new Executor[size];

        create_workers(size);

        LOGGER.log(Level.INFO, "Threads Created and Ready for Job.");
    }

    private void create_workers(int size) {
        for (int i = 0; i< size; i++){
            executors[i] = new Executor("Executor :: " + i);
            executors[i].start();
            availableQueue.offer(executors[i]);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static boolean isExecutorsAvailable(){
        if(jobQueue.isEmpty()) {
            try {
                LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
                executorLock.wait();
            }
            catch (IllegalMonitorStateException e) {            }
            catch (InterruptedException e) {}
        }
//        executorLock.notify();
        return true;
    }


    private boolean isTaskAvailable(){
        while(availableQueue.isEmpty()) {
            try {
                availableLock.wait();
            }
            catch (IllegalMonitorStateException e) { }
            catch (InterruptedException e) {  }
        }
//        availableLock.notify();
        return true;
    }

    private Executor getExecutor(){

        Executor curr = null;
        if(isTaskAvailable())
            curr = availableQueue.poll();
        return curr;
    }

    public void execute(Job job){
        LOGGER.log(Level.INFO, "Job added to the queue :: ");
        jobQueue.offer(job);

        while(Pool.isExecutorsAvailable()){
            Executor t = getExecutor();
            if (t.getState().name().equals("NEW")){
                t.start();
            }

            wait_for_completion(t);
            availableQueue.offer(new Executor(t.name));
        }

    }

    private static void wait_for_completion(Executor t) {
        while (t.isAlive()){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class Executor extends Thread{

        String name;


        public Executor(String name){
            this.name = name;

        }
        @Override
        public void run() {
            while(!jobQueue.isEmpty()){
                Job job = jobQueue.poll();
                job.run();
            }
        }
    }

}

I am trying to work on some understanding on thread pools and am trying to implement one by myself without using the ExecutorService. But getting into waiting state for some reason and I am not sure what is causing it. Here is the implementation that I have.

I have a Pool class which is responsible for creating the worker threads and keeping it ready for jobs that are coming in. I will initialize those in my constructor. Also add those to the queue of available threads.availableQueue this is a BlockingQueue<Executor> of Executor which is an inner class inside the Pool class

    private void create_workers(int size) {
            for (int i = 0; i< size; i++){
                executors[i] = new Executor("Executor :: " + i);
                availableQueue.offer(executors[i]);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }

The client code will call get the executor and invoke the job like this

Pool threadPool = new Pool(5);
        for (int i=0;i<10;i++){
            threadPool.execute(new Job("Job : " + i));
        }

Job class is a simple Runnable class to mimic a job

execute method will add the job to its jobQueue and wait if any executors are available or not, if its available then get the executor and invoke the job to execute it. After completion it will put a new Executor in the available queue. Reason for this is after the completion the thread is going into TERMINATED state and could not get access to the same thread. Not sure if there is a way to repurpose the same thread.. Help needed here if it's possible.

public void execute(Job job){
        LOGGER.log(Level.INFO, "Job added to the queue :: ");
        jobQueue.offer(job);

        while(Pool.isExecutorsAvailable()){
            Executor t = getExecutor();
            if (t.getState().name().equals("NEW")){
                t.start();
            }

            wait_for_completion(t);
            availableQueue.offer(new Executor(t.name));
        }

}

The actual problem is that after the first job the code is going on infinite waiting state at while(Pool.isExecutorsAvailable()) I could not identify the problem or not sure whats causing the issue.

The code for isExecutorsAvailable is here

   public static boolean isExecutorsAvailable(){
        if(jobQueue.isEmpty()) {
            try {
                LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
                executorLock.wait();
            }
            catch (IllegalMonitorStateException e) {            }
            catch (InterruptedException e) {}
        }
//        executorLock.notify();
        return true;
    }

Any help on this is much appreciated

Edit:

Added all the code snippets

//Client

package com.java.pool;

public class Client {

    public static void main(String[] args) {
        Client m = new Client();
        Pool threadPool = new Pool(5);
        for (int i=0;i<10;i++){
            threadPool.execute(new Job("Job : " + i));
        }
    }

}

//Job
package com.java.pool;


import java.text.SimpleDateFormat;
import java.util.*;


public class Job implements Runnable{

    String name;

    public Job(String name){
        this.name = name;
    }

    public void print(String message){
        System.out.println(message);
    }

    public void run(){
        String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss").format(new java.util.Date());
        print(Thread.currentThread().getName() + " Executor picked the job at : " + timeStamp);
        work_with_time();
        print("Job " + this.name + " Completed at : " + timeStamp);
    }

    private void work_with_time() {
        Random r = new Random();
        int executionTime = r.nextInt(5000);
        try {
            Thread.sleep(executionTime);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

//Pool

package com.java.pool;


import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class Pool {

    private static final Logger LOGGER = Logger.getLogger(Pool.class.getName());

    int size;
    BlockingQueue<Executor> availableQueue;
    Executor[] executors;

    Object availableLock = new Object();
    static Object executorLock = new Object();
    static BlockingQueue<Job> jobQueue;


    public Pool(int size){
        this.size = size;
        availableQueue = new LinkedBlockingQueue<>();
        jobQueue = new LinkedBlockingQueue<>();

        LOGGER.log(Level.INFO, "All Internal Queue's Initialized.");
        LOGGER.log(Level.INFO, "Pool of size :: " + size + " Created.");

        executors = new Executor[size];

        create_workers(size);

        LOGGER.log(Level.INFO, "Threads Created and Ready for Job.");
    }

    private void create_workers(int size) {
        for (int i = 0; i< size; i++){
            executors[i] = new Executor("Executor :: " + i);
            executors[i].start();
            availableQueue.offer(executors[i]);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static boolean isExecutorsAvailable(){
        if(jobQueue.isEmpty()) {
            try {
                LOGGER.log(Level.INFO, Thread.currentThread().getName() + " is waiting..");
                executorLock.wait();
            }
            catch (IllegalMonitorStateException e) {            }
            catch (InterruptedException e) {}
        }
//        executorLock.notify();
        return true;
    }


    private boolean isTaskAvailable(){
        while(availableQueue.isEmpty()) {
            try {
                availableLock.wait();
            }
            catch (IllegalMonitorStateException e) { }
            catch (InterruptedException e) {  }
        }
//        availableLock.notify();
        return true;
    }

    private Executor getExecutor(){

        Executor curr = null;
        if(isTaskAvailable())
            curr = availableQueue.poll();
        return curr;
    }

    public void execute(Job job){
        LOGGER.log(Level.INFO, "Job added to the queue :: ");
        jobQueue.offer(job);

        while(Pool.isExecutorsAvailable()){
            Executor t = getExecutor();
            if (t.getState().name().equals("NEW")){
                t.start();
            }

            wait_for_completion(t);
            availableQueue.offer(new Executor(t.name));
        }

    }

    private static void wait_for_completion(Executor t) {
        while (t.isAlive()){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class Executor extends Thread{

        String name;


        public Executor(String name){
            this.name = name;

        }
        @Override
        public void run() {
            while(!jobQueue.isEmpty()){
                Job job = jobQueue.poll();
                job.run();
            }
        }
    }

}

Share Improve this question edited Feb 4 at 4:57 Vinod Krishnan asked Feb 3 at 17:34 Vinod KrishnanVinod Krishnan 1532 silver badges13 bronze badges 8
  • 4 The idea of having a queue of "executors," seems exactly backward. Normally a thread pool has a queue of jobs, and some generic collection* of worker threads that all compete with each other trying to take jobs from the queue. [*Actually, in the very simplest thread pool—one that has no provision for changing the size of the pool or for shutting down—you don't even need to keep the threads in a collection. You just fire them off and fet them, and they will happily continue taking jobs from the queue and executing those jobs. – Ohm's Lawman Commented Feb 3 at 19:13
  • 2 "The one you are suggesting is more of a lazy initializing of thread on demand." No, it isn't. You set up the pool, you start the threads when the pool is created, each of which repeatedly takes jobs from the queue. – Louis Wasserman Commented Feb 3 at 20:07
  • 1 See the answer that I just posted, below, for an illustration of what I was talking about. – Ohm's Lawman Commented Feb 3 at 20:12
  • 1 @VinodKrishnan, Read the example code in my answer. The worker threads do not terminate after performing a task. They perform the tasks inside a while (true) loop, and they never terminate unless the task throws some kind of an unchecked Throwable. – Ohm's Lawman Commented Feb 3 at 22:16
  • 1 Your example code only shows a few fragments. It's hard to understand what you're trying to do from those fragments, and your text descriptions don't really clear up the mystery. You appear to have a problem. Why not post a complete example, that other people can run to replicate the problem for themselves? I can guess what the problem might be, but without seeing the missing bits of the code, there's no way I can be certain. (Hint: You show an executorLock.wait() call, but you have not shown any call that would allow the wait to return.) – Solomon Slow Commented Feb 3 at 22:35
 |  Show 3 more comments

1 Answer 1

Reset to default 3

IMO, you are trying to invert the problem by having a queue of threads when what should be queued are the tasks.

I haven't really tried to understand your code, but here is a complete, trivial thread pool implementation to illustrate the core concept. The static main method demonstrates that it works:

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;


class SimpleThreadPool implements Executor {
    private
    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
    
    public
    SimpleThreadPool(int numWorkerThreads) { 
        for (int i=0 ; i<numWorkerThreads ; i++) {
            Thread t = new Thread(() -> {
                while (true) {
                     try {
                         Runnable task = queue.take();
                         task.run();
                     }
                     catch (InterruptedException ex) {
                         ex.printStackTrace();
                         break;
                     }
                }
            });
            t.start();
        }
    }

    @Override
    public
    void execute(Runnable task) {
        queue.add(task);
    }

    static public
    void main(String[] args) {
        SimpleThreadPool pool = new SimpleThreadPool(3);
        for (int i=0 ; i<10 ; i++) {
            final int seqNum = i;
            pool.execute(() -> System.out.println(seqNum));
        }
    }
}

There are various things you could add to this to make it more sophisticated;

  • You could provide a shutdown method*
  • You could provide means for a caller to wait for all of the tasks to be completed.
  • You could add graceful handling (or at least, logging) of Errors and RuntimeExceptions that might be thrown by a task.
  • You could provide means to automatically kill off worker threads that have not been used in a while and, to automatically create new worker threads when demand is high.
  • You could provide means to submit a Callable task and return a Future to the caller.
  • Other stuff (See ExecutorService or ThreadPoolExecutor for more ideas.)

* Note: The demo program won't terminate on its own. You'll have to forcibly kill it because I did not provide any means to shut the pool down.

发布评论

评论列表(0)

  1. 暂无评论