001/**
002 *
003 * Copyright 2018 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.util.Map;
020import java.util.Queue;
021import java.util.WeakHashMap;
022import java.util.concurrent.ConcurrentLinkedQueue;
023import java.util.concurrent.Executor;
024
025/**
026 * Helper class to perform an operation asynchronous but keeping the order in respect to a given key.
027 * <p>
028 * A typical use pattern for this helper class consists of callbacks for an abstract entity where the order of callbacks
029 * matters, which eventually call user code in form of listeners. Since the order the callbacks matters, you need to use
030 * synchronous connection listeners. But if those listeners would invoke the user provided listeners, and if those user
031 * provided listeners would take a long time to complete, or even worse, block, then Smack's total progress is stalled,
032 * since synchronous connection listeners are invoked from the main event loop.
033 * </p>
034 * <p>
035 * It is common for those situations that the order of callbacks is not globally important, but only important in
036 * respect to an particular entity. Take chat state notifications (CSN) for example: Assume there are two contacts which
037 * send you CSNs. If a contact sends you first 'active' and then 'inactive, it is crucial that first the listener is
038 * called with 'active' and afterwards with 'inactive'. But if there is another contact is sending 'composing' followed
039 * by 'paused', then it is also important that the listeners are invoked in the correct order, but the order in which
040 * the listeners for those two contacts are invoked does not matter.
041 * </p>
042 * <p>
043 * Using this helper class, one would call {@link #performAsyncButOrdered(Object, Runnable)} which the remote contacts
044 * JID as first argument and a {@link Runnable} invoking the user listeners as second. This class guarantees that
045 * runnables of subsequent invocations are always executed after the runnables of previous invocations using the same
046 * key.
047 * </p>
048 *
049 * @param <K> the type of the key
050 * @since 4.3
051 */
052public class AsyncButOrdered<K> {
053
054    private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();
055
056    private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>();
057
058    private final Executor executor;
059
060    public AsyncButOrdered() {
061        this(null);
062    }
063
064    public AsyncButOrdered(Executor executor) {
065        this.executor = executor;
066    }
067
068    /**
069     * Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
070     *
071     * @param key the key deriving the order
072     * @param runnable the {@link Runnable} to run
073     * @return true if a new thread was created
074     */
075    public boolean performAsyncButOrdered(K key, Runnable runnable) {
076        Queue<Runnable> keyQueue;
077        synchronized (pendingRunnables) {
078            keyQueue = pendingRunnables.get(key);
079            if (keyQueue == null) {
080                keyQueue = new ConcurrentLinkedQueue<>();
081                pendingRunnables.put(key, keyQueue);
082            }
083        }
084
085        keyQueue.add(runnable);
086
087        boolean newHandler;
088        synchronized (threadActiveMap) {
089            Boolean threadActive = threadActiveMap.get(key);
090            if (threadActive == null) {
091                threadActive = false;
092                threadActiveMap.put(key, threadActive);
093            }
094
095            newHandler = !threadActive;
096            if (newHandler) {
097                Handler handler = new Handler(keyQueue, key);
098                threadActiveMap.put(key, true);
099                if (executor == null) {
100                    AbstractXMPPConnection.asyncGo(handler);
101                } else {
102                    executor.execute(handler);
103                }
104            }
105        }
106
107        return newHandler;
108    }
109
110    public Executor asExecutorFor(final K key) {
111        return new Executor() {
112            @Override
113            public void execute(Runnable runnable) {
114                performAsyncButOrdered(key, runnable);
115            }
116        };
117    }
118
119    private class Handler implements Runnable {
120        private final Queue<Runnable> keyQueue;
121        private final K key;
122
123        Handler(Queue<Runnable> keyQueue, K key) {
124            this.keyQueue = keyQueue;
125            this.key = key;
126        }
127
128        @Override
129        public void run() {
130            mainloop:
131            while (true) {
132                Runnable runnable = null;
133                while ((runnable = keyQueue.poll()) != null) {
134                    try {
135                        runnable.run();
136                    } catch (Throwable t) {
137                        // The run() method threw, this handler thread is going to terminate because of that. Ensure we note
138                        // that in the map.
139                        synchronized (threadActiveMap) {
140                            threadActiveMap.put(key, false);
141                        }
142                        throw t;
143                    }
144                }
145
146                synchronized (threadActiveMap) {
147                    // If the queue is empty, stop this handler, otherwise continue looping.
148                    if (keyQueue.isEmpty()) {
149                        threadActiveMap.put(key, false);
150                        break mainloop;
151                    }
152                }
153            }
154        }
155    }
156}