I have seen the thread pool executor implementation and the rejected execution policies that it provides. However, I have a custom requirement - I want to have a call back mechanism where in I get notifications when the queue size limit is reached and say when the queue size reduces to say 80 % of the max allowed queue size.
public interface ISaturatedPoolObserver { void onSaturated(); // called when the blocking queue reaches the size limit void onUnsaturated(); // called when blocking queues size goes below the threshold. }我认为可以通过子类化线程池执行程序来实现,但是已经实现了吗?我很乐意在需要时提供更多细节和我的工作,以便提供清晰的信息.
I feel that this can be implemented by subclassing thread pool executor, but is there an already implemented version? I would be happy to add more details and my work so far as and when needed to provide clarity.
I want to have a call back mechanism where in I get notifications when the queue size limit is reached...
I wouldn't subclass the executor but I would subclass the BlockingQueue that is used by the executor. Something like the following should work. There are race conditions in the code around the checkUnsaturated() if you remove an entry and someone puts one back in. You might have to synchronize on the queue if these need to be perfect. Also, I have no idea what methods the executor implementations use so you might not need to override some of these.
public class ObservableBlockingQueue<E> extends LinkedBlockingQueue<E> { private ISaturatedPoolObserver observer; private int capacity; public ObservableBlockingQueue(ISaturatedPoolObserver observer, int capacity) { super(capacity); this.observer = observer; this.capacity = capacity; } @Override public boolean offer(E o) { boolean offered = super.offer(o); if (!offered) { observer.onSaturated(); } return offered; } @Override public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { boolean offered = super.offer(o, timeout, unit); if (!offered) { observer.onSaturated(); } return offered; } @Override public E poll() { E e = super.poll(); if (e != null) { checkUnsaturated(); } return e; } @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { E e = super.poll(timeout, unit); if (e != null) { checkUnsaturated(); } return e; } @Override public E take() throws InterruptedException { E e = super.take(); checkUnsaturated(); return e; } @Override public boolean remove(E e) throws InterruptedException { boolean removed = super.remove(e); if (removed) { checkUnsaturated(); } return removed; } private void checkUnsaturated() { if (super.size() * 100 / capacity < UNSATURATED_PERCENTAGE) { observer.onUnsaturated(); } } }