001/**
002 *
003 * Copyright 2014-2018 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.util;
018
019import java.util.AbstractQueue;
020import java.util.Collection;
021import java.util.Iterator;
022import java.util.NoSuchElementException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.locks.Condition;
026import java.util.concurrent.locks.ReentrantLock;
027
028/**
029 * Like ArrayBlockingQueue but with additional {@link #shutdown()} and {@link #start} methods. Will
030 * throw {@link InterruptedException} if Queue has been shutdown on {@link #take()} and
031 * {@link #poll(long, TimeUnit)}.
032 * <p>
033 * Based on ArrayBlockingQueue of OpenJDK by Doug Lea (who released ArrayBlockingQueue as public
034 * domain).
035 *
036 * @param <E> the type of elements held in this collection
037 */
038public class ArrayBlockingQueueWithShutdown<E> extends AbstractQueue<E> implements BlockingQueue<E> {
039
040    private final E[] items;
041
042    private int takeIndex;
043
044    private int putIndex;
045
046    private int count;
047
048    private final ReentrantLock lock;
049
050    private final Condition notEmpty;
051
052    private final Condition notFull;
053
054    private volatile boolean isShutdown = false;
055
056    private int inc(int i) {
057        return (++i == items.length) ? 0 : i;
058    }
059
060    private void insert(E e) {
061        items[putIndex] = e;
062        putIndex = inc(putIndex);
063        count++;
064        notEmpty.signal();
065    }
066
067    private E extract() {
068        E e = items[takeIndex];
069        items[takeIndex] = null;
070        takeIndex = inc(takeIndex);
071        count--;
072        notFull.signal();
073        return e;
074    }
075
076    private void removeAt(int i) {
077        if (i == takeIndex) {
078            items[takeIndex] = null;
079            takeIndex = inc(takeIndex);
080        }
081        else {
082            while (true) {
083                int nexti = inc(i);
084                if (nexti != putIndex) {
085                    items[i] = items[nexti];
086                    i = nexti;
087                }
088                else {
089                    items[i] = null;
090                    putIndex = i;
091                    break;
092                }
093            }
094        }
095        count--;
096        notFull.signal();
097    }
098
099    private static void checkNotNull(Object o) {
100        if (o == null) {
101            throw new NullPointerException();
102        }
103    }
104
105    private void checkNotShutdown() throws InterruptedException {
106        if (isShutdown) {
107            throw new InterruptedException();
108        }
109    }
110
111    private boolean hasNoElements() {
112        return count == 0;
113    }
114
115    private boolean hasElements() {
116        return !hasNoElements();
117    }
118
119    private boolean isFull() {
120        return count == items.length;
121    }
122
123    private boolean isNotFull() {
124        return !isFull();
125    }
126
127    public ArrayBlockingQueueWithShutdown(int capacity) {
128        this(capacity, false);
129    }
130
131    @SuppressWarnings("unchecked")
132    public ArrayBlockingQueueWithShutdown(int capacity, boolean fair) {
133        if (capacity <= 0)
134            throw new IllegalArgumentException();
135        items = (E[]) new Object[capacity];
136        lock = new ReentrantLock(fair);
137        notEmpty = lock.newCondition();
138        notFull = lock.newCondition();
139    }
140
141    /**
142     * Shutdown the Queue. Will method currently waiting for a not full/empty condition will unblock
143     * (and usually throw a InterruptedException).
144     */
145    public void shutdown() {
146        lock.lock();
147        try {
148            isShutdown = true;
149            notEmpty.signalAll();
150            notFull.signalAll();
151        }
152        finally {
153            lock.unlock();
154        }
155    }
156
157    /**
158     * Start the queue. Newly created instances will be started automatically, thus this only needs
159     * to be called after {@link #shutdown()}.
160     */
161    public void start() {
162        lock.lock();
163        try {
164            isShutdown = false;
165        }
166        finally {
167            lock.unlock();
168        }
169    }
170
171    /**
172     * Returns true if the queue is currently shut down.
173     *
174     * @return true if the queue is shut down.
175     */
176    public boolean isShutdown() {
177        lock.lock();
178        try {
179            return isShutdown;
180        } finally {
181            lock.unlock();
182        }
183    }
184
185    @Override
186    public E poll() {
187        lock.lock();
188        try {
189            if (hasNoElements()) {
190                return null;
191            }
192            E e = extract();
193            return e;
194        }
195        finally {
196            lock.unlock();
197        }
198    }
199
200    @Override
201    public E peek() {
202        lock.lock();
203        try {
204            return hasNoElements() ? null : items[takeIndex];
205        }
206        finally {
207            lock.unlock();
208        }
209    }
210
211    @Override
212    public boolean offer(E e) {
213        checkNotNull(e);
214        lock.lock();
215        try {
216            if (isFull() || isShutdown) {
217                return false;
218            }
219            else {
220                insert(e);
221                return true;
222            }
223        }
224        finally {
225            lock.unlock();
226        }
227    }
228
229    /**
230     * Inserts the specified element into this queue, waiting if necessary
231     * for space to become available.
232     * <p>
233     * This may throw an {@link InterruptedException} in two cases
234     * <ol>
235     *  <li>If the queue was shut down.</li>
236     *  <li>If the thread was was interrupted.</li>
237     * </ol>
238     * So you have to check which is the case, e.g. by calling {@link #isShutdown()}.
239     *
240     * @param e the element to add.
241     * @throws InterruptedException if interrupted while waiting or if the queue was shut down.
242     */
243    @Override
244    public void put(E e) throws InterruptedException {
245        checkNotNull(e);
246        lock.lockInterruptibly();
247
248        try {
249            while (isFull()) {
250                try {
251                    notFull.await();
252                    checkNotShutdown();
253                }
254                catch (InterruptedException ie) {
255                    notFull.signal();
256                    throw ie;
257                }
258            }
259            insert(e);
260        }
261        finally {
262            lock.unlock();
263        }
264    }
265
266    public enum TryPutResult {
267        /**
268         * The method was unable to acquire the queue lock.
269         */
270        couldNotLock,
271
272        /**
273         * The queue was shut down.
274         */
275        queueWasShutDown,
276
277        /**
278         * The method was unable to put another element into the queue because the queue was full.
279         */
280        queueWasFull,
281
282        /**
283         * The element was successfully placed into the queue.
284         */
285        putSuccessful,
286    }
287
288    public TryPutResult tryPut(E e) {
289        checkNotNull(e);
290
291        boolean locked = lock.tryLock();
292        if (!locked) {
293            return TryPutResult.couldNotLock;
294        }
295        try {
296            if (isShutdown) {
297                return TryPutResult.queueWasShutDown;
298            }
299            if (isFull()) {
300                return TryPutResult.queueWasFull;
301            }
302
303            insert(e);
304            return TryPutResult.putSuccessful;
305        } finally {
306            lock.unlock();
307        }
308    }
309
310    @Override
311    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
312        checkNotNull(e);
313        long nanos = unit.toNanos(timeout);
314        lock.lockInterruptibly();
315        try {
316            while (true) {
317                if (isNotFull()) {
318                    insert(e);
319                    return true;
320                }
321                if (nanos <= 0) {
322                    return false;
323                }
324                try {
325                    nanos = notFull.awaitNanos(nanos);
326                    checkNotShutdown();
327                }
328                catch (InterruptedException ie) {
329                    notFull.signal();
330                    throw ie;
331                }
332            }
333        }
334        finally {
335            lock.unlock();
336        }
337
338    }
339
340    @Override
341    public E take() throws InterruptedException {
342        lock.lockInterruptibly();
343        try {
344            checkNotShutdown();
345            try {
346                while (hasNoElements()) {
347                    notEmpty.await();
348                    checkNotShutdown();
349                }
350            }
351            catch (InterruptedException ie) {
352                notEmpty.signal();
353                throw ie;
354            }
355            E e = extract();
356            return e;
357        }
358        finally {
359            lock.unlock();
360        }
361    }
362
363    public enum TryTakeResultCode {
364        /**
365         * The method was unable to acquire the queue lock.
366         */
367        couldNotLock,
368
369        /**
370         * The queue was shut down.
371         */
372        queueWasShutDown,
373
374        /**
375         * The queue was empty.
376         */
377        queueWasEmpty,
378
379        /**
380         * An element was successfully removed from the queue.
381         */
382        takeSuccessful,
383    }
384
385    public static final class TryTakeResult<E> {
386        private final E element;
387        private final TryTakeResultCode resultCode;
388
389        private TryTakeResult(TryTakeResultCode resultCode) {
390            assert resultCode != null;
391            this.resultCode = resultCode;
392            this.element = null;
393        }
394
395        private TryTakeResult(E element) {
396            assert element != null;
397            this.resultCode = TryTakeResultCode.takeSuccessful;
398            this.element = element;
399        }
400
401        public TryTakeResultCode getResultCode() {
402            return resultCode;
403        }
404
405        public E getElement() {
406            return element;
407        }
408    }
409
410    public TryTakeResult<E> tryTake() {
411        boolean locked = lock.tryLock();
412        if (!locked) {
413            return new TryTakeResult<E>(TryTakeResultCode.couldNotLock);
414        }
415        try {
416            if (isShutdown) {
417                return new TryTakeResult<E>(TryTakeResultCode.queueWasShutDown);
418            }
419            if (hasNoElements()) {
420                return new TryTakeResult<E>(TryTakeResultCode.queueWasEmpty);
421            }
422            E element = extract();
423            return new TryTakeResult<E>(element);
424        } finally {
425            lock.unlock();
426        }
427    }
428
429    @Override
430    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
431        long nanos = unit.toNanos(timeout);
432        lock.lockInterruptibly();
433        try {
434            checkNotShutdown();
435            while (true) {
436                if (hasElements()) {
437                    E e = extract();
438                    return e;
439                }
440                if (nanos <= 0) {
441                    return null;
442                }
443                try {
444                    nanos = notEmpty.awaitNanos(nanos);
445                    checkNotShutdown();
446                }
447                catch (InterruptedException ie) {
448                    notEmpty.signal();
449                    throw ie;
450                }
451            }
452        }
453        finally {
454            lock.unlock();
455        }
456    }
457
458    @Override
459    public int remainingCapacity() {
460        lock.lock();
461        try {
462            return items.length - count;
463        }
464        finally {
465            lock.unlock();
466        }
467    }
468
469    @Override
470    public int drainTo(Collection<? super E> c) {
471        checkNotNull(c);
472        if (c == this) {
473            throw new IllegalArgumentException();
474        }
475        lock.lock();
476        try {
477            int i = takeIndex;
478            int n = 0;
479            for (; n < count; n++) {
480                c.add(items[i]);
481                items[i] = null;
482                i = inc(i);
483            }
484            if (n > 0) {
485                count = 0;
486                putIndex = 0;
487                takeIndex = 0;
488                notFull.signalAll();
489            }
490            return n;
491        }
492        finally {
493            lock.unlock();
494        }
495    }
496
497    @Override
498    public int drainTo(Collection<? super E> c, int maxElements) {
499        checkNotNull(c);
500        if (c == this) {
501            throw new IllegalArgumentException();
502        }
503        if (maxElements <= 0) {
504            return 0;
505        }
506        lock.lock();
507        try {
508            int i = takeIndex;
509            int n = 0;
510            int max = (maxElements < count) ? maxElements : count;
511            for (; n < max; n++) {
512                c.add(items[i]);
513                items[i] = null;
514                i = inc(i);
515            }
516            if (n > 0) {
517                count -= n;
518                takeIndex = i;
519                notFull.signalAll();
520            }
521            return n;
522        }
523        finally {
524            lock.unlock();
525        }
526    }
527
528    @Override
529    public int size() {
530        lock.lock();
531        try {
532            return count;
533        }
534        finally {
535            lock.unlock();
536        }
537    }
538
539    @Override
540    public Iterator<E> iterator() {
541        lock.lock();
542        try {
543            return new Itr();
544        }
545        finally {
546            lock.unlock();
547        }
548    }
549
550    private class Itr implements Iterator<E> {
551        private int nextIndex;
552        private E nextItem;
553        private int lastRet;
554
555        Itr() {
556            lastRet = -1;
557            if (count == 0) {
558                nextIndex = -1;
559            }
560            else {
561                nextIndex = takeIndex;
562                nextItem = items[takeIndex];
563            }
564        }
565
566        @Override
567        public boolean hasNext() {
568            return nextIndex >= 0;
569        }
570
571        private void checkNext() {
572            if (nextIndex == putIndex) {
573                nextIndex = -1;
574                nextItem = null;
575            }
576            else {
577                nextItem = items[nextIndex];
578                if (nextItem == null) {
579                    nextIndex = -1;
580                }
581            }
582        }
583
584        @Override
585        public E next() {
586            lock.lock();
587            try {
588                if (nextIndex < 0) {
589                    throw new NoSuchElementException();
590                }
591                lastRet = nextIndex;
592                E e = nextItem;
593                nextIndex = inc(nextIndex);
594                checkNext();
595                return e;
596            }
597            finally {
598                lock.unlock();
599            }
600        }
601
602        @Override
603        public void remove() {
604            lock.lock();
605            try {
606                int i = lastRet;
607                if (i < 0) {
608                    throw new IllegalStateException();
609                }
610                lastRet = -1;
611                int ti = takeIndex;
612                removeAt(i);
613                nextIndex = (i == ti) ? takeIndex : i;
614                checkNext();
615            }
616            finally {
617                lock.unlock();
618            }
619        }
620    }
621
622}