001/**
002 *
003 * Copyright 2003-2007 Jive Software, 2016-2017 Florian Schmaus.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack;
019
020import java.util.concurrent.ArrayBlockingQueue;
021import java.util.concurrent.TimeUnit;
022
023import org.jivesoftware.smack.SmackException.NotConnectedException;
024import org.jivesoftware.smack.SmackException.NoResponseException;
025import org.jivesoftware.smack.XMPPException.XMPPErrorException;
026import org.jivesoftware.smack.filter.StanzaFilter;
027import org.jivesoftware.smack.packet.Stanza;
028
029/**
030 * Provides a mechanism to collect Stanzas into a result queue that pass a
031 * specified filter/matcher. The collector lets you perform blocking and polling
032 * operations on the result queue. So, a StanzaCollector is more suitable to
033 * use than a {@link StanzaListener} when you need to wait for a specific
034 * result.<p>
035 *
036 * Each stanza(/packet) collector will queue up a configured number of packets for processing before
037 * older packets are automatically dropped.  The default number is retrieved by 
038 * {@link SmackConfiguration#getStanzaCollectorSize()}.
039 *
040 * @see XMPPConnection#createStanzaCollector(StanzaFilter)
041 * @author Matt Tucker
042 */
043public class StanzaCollector {
044
045    private final StanzaFilter packetFilter;
046
047    private final ArrayBlockingQueue<Stanza> resultQueue;
048
049    /**
050     * The stanza(/packet) collector which timeout for the next result will get reset once this collector collects a stanza.
051     */
052    private final StanzaCollector collectorToReset;
053
054    private final XMPPConnection connection;
055
056    private boolean cancelled = false;
057
058    /**
059     * Creates a new stanza(/packet) collector. If the stanza(/packet) filter is <tt>null</tt>, then
060     * all packets will match this collector.
061     *
062     * @param connection the connection the collector is tied to.
063     * @param configuration the configuration used to construct this collector
064     */
065    protected StanzaCollector(XMPPConnection connection, Configuration configuration) {
066        this.connection = connection;
067        this.packetFilter = configuration.packetFilter;
068        this.resultQueue = new ArrayBlockingQueue<>(configuration.size);
069        this.collectorToReset = configuration.collectorToReset;
070    }
071
072    /**
073     * Explicitly cancels the stanza(/packet) collector so that no more results are
074     * queued up. Once a stanza(/packet) collector has been cancelled, it cannot be
075     * re-enabled. Instead, a new stanza(/packet) collector must be created.
076     */
077    public void cancel() {
078        // If the packet collector has already been cancelled, do nothing.
079        if (!cancelled) {
080            cancelled = true;
081            connection.removeStanzaCollector(this);
082        }
083    }
084
085    /**
086     * Returns the stanza(/packet) filter associated with this stanza(/packet) collector. The packet
087     * filter is used to determine what packets are queued as results.
088     *
089     * @return the stanza(/packet) filter.
090     * @deprecated use {@link #getStanzaFilter()} instead.
091     */
092    @Deprecated
093    public StanzaFilter getPacketFilter() {
094        return getStanzaFilter();
095    }
096
097    /**
098     * Returns the stanza filter associated with this stanza collector. The stanza
099     * filter is used to determine what stanzas are queued as results.
100     *
101     * @return the stanza filter.
102     */
103    public StanzaFilter getStanzaFilter() {
104        return packetFilter;
105    }
106
107    /**
108     * Polls to see if a stanza(/packet) is currently available and returns it, or
109     * immediately returns <tt>null</tt> if no packets are currently in the
110     * result queue.
111     *
112     * @return the next stanza(/packet) result, or <tt>null</tt> if there are no more
113     *      results.
114     */
115    @SuppressWarnings("unchecked")
116    public <P extends Stanza> P pollResult() {
117        return (P) resultQueue.poll();
118    }
119
120    /**
121     * Polls to see if a stanza(/packet) is currently available and returns it, or
122     * immediately returns <tt>null</tt> if no packets are currently in the
123     * result queue.
124     * <p>
125     * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError.
126     * </p>
127     * 
128     * @return the next available packet.
129     * @throws XMPPErrorException in case an error response.
130     */
131    public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException {
132        P result = pollResult();
133        if (result != null) {
134            XMPPErrorException.ifHasErrorThenThrow(result);
135        }
136        return result;
137    }
138
139    /**
140     * Returns the next available packet. The method call will block (not return) until a stanza(/packet) is
141     * available.
142     * 
143     * @return the next available packet.
144     * @throws InterruptedException 
145     */
146    @SuppressWarnings("unchecked")
147    public <P extends Stanza> P nextResultBlockForever() throws InterruptedException {
148        throwIfCancelled();
149        P res = null;
150        while (res == null) {
151            res = (P) resultQueue.take();
152        }
153        return res;
154    }
155
156    /**
157     * Returns the next available packet. The method call will block until the connection's default
158     * timeout has elapsed.
159     * 
160     * @return the next available packet.
161     * @throws InterruptedException 
162     */
163    public <P extends Stanza> P nextResult() throws InterruptedException {
164        return nextResult(connection.getReplyTimeout());
165    }
166
167    private volatile long waitStart;
168
169    /**
170     * Returns the next available packet. The method call will block (not return)
171     * until a stanza(/packet) is available or the <tt>timeout</tt> has elapsed. If the
172     * timeout elapses without a result, <tt>null</tt> will be returned.
173     *
174     * @param timeout the timeout in milliseconds.
175     * @return the next available packet.
176     * @throws InterruptedException 
177     */
178    @SuppressWarnings("unchecked")
179    public <P extends Stanza> P nextResult(long timeout) throws InterruptedException {
180        throwIfCancelled();
181        P res = null;
182        long remainingWait = timeout;
183        waitStart = System.currentTimeMillis();
184        do {
185            res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS);
186            if (res != null) {
187                return res;
188            }
189            remainingWait = timeout - (System.currentTimeMillis() - waitStart);
190        } while (remainingWait > 0);
191        return null;
192    }
193
194    /**
195     * Returns the next available stanza. The method in equivalent to
196     * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of
197     * the connection associated with this collector.
198     * 
199     * @return the next available stanza.
200     * @throws XMPPErrorException in case an error response was received.
201     * @throws NoResponseException if there was no response from the server.
202     * @throws InterruptedException
203     * @throws NotConnectedException
204     * @see #nextResultOrThrow(long)
205     */
206    public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException,
207                    InterruptedException, NotConnectedException {
208        return nextResultOrThrow(connection.getReplyTimeout());
209    }
210
211    /**
212     * Returns the next available stanza. The method call will block until a stanza is
213     * available or the <tt>timeout</tt> has elapsed. This method does also cancel the
214     * collector in every case.
215     * <p>
216     * Three things can happen when waiting for an response:
217     * </p>
218     * <ol>
219     * <li>A result response arrives.</li>
220     * <li>An error response arrives.</li>
221     * <li>An timeout occurs.</li>
222     * <li>The thread is interrupted</li>
223     * </ol>
224     * <p>
225     * in which this method will
226     * </p>
227     * <ol>
228     * <li>return with the result.</li>
229     * <li>throw an {@link XMPPErrorException}.</li>
230     * <li>throw an {@link NoResponseException}.</li>
231     * <li>throw an {@link InterruptedException}.</li>
232     * </ol>
233     * <p>
234     * Additionally the method will throw a {@link NotConnectedException} if no response was
235     * received and the connection got disconnected.
236     * </p>
237     *
238     * @param timeout the amount of time to wait for the next stanza in milliseconds.
239     * @return the next available stanza.
240     * @throws NoResponseException if there was no response from the server.
241     * @throws XMPPErrorException in case an error response was received.
242     * @throws InterruptedException if the calling thread was interrupted.
243     * @throws NotConnectedException if there was no response and the connection got disconnected.
244     */
245    public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException,
246                    XMPPErrorException, InterruptedException, NotConnectedException {
247        P result = nextResult(timeout);
248        cancel();
249        if (result == null) {
250            if (!connection.isConnected()) {
251                throw new NotConnectedException(connection, packetFilter);
252            }
253            throw NoResponseException.newWith(connection, this);
254        }
255
256        XMPPErrorException.ifHasErrorThenThrow(result);
257
258        return result;
259    }
260
261    /**
262     * Get the number of collected stanzas this stanza(/packet) collector has collected so far.
263     * 
264     * @return the count of collected stanzas.
265     * @since 4.1
266     */
267    public int getCollectedCount() {
268        return resultQueue.size();
269    }
270
271    /**
272     * Processes a stanza(/packet) to see if it meets the criteria for this stanza(/packet) collector.
273     * If so, the stanza(/packet) is added to the result queue.
274     *
275     * @param packet the stanza(/packet) to process.
276     */
277    protected void processStanza(Stanza packet) {
278        if (packetFilter == null || packetFilter.accept(packet)) {
279            // CHECKSTYLE:OFF
280                while (!resultQueue.offer(packet)) {
281                        // Since we know the queue is full, this poll should never actually block.
282                        resultQueue.poll();
283                }
284            // CHECKSTYLE:ON
285            if (collectorToReset != null) {
286                collectorToReset.waitStart = System.currentTimeMillis();
287            }
288        }
289    }
290
291    private final void throwIfCancelled() {
292        if (cancelled) {
293            throw new IllegalStateException("Packet collector already cancelled");
294        }
295    }
296
297    /**
298     * Get a new stanza(/packet) collector configuration instance.
299     * 
300     * @return a new stanza(/packet) collector configuration.
301     */
302    public static Configuration newConfiguration() {
303        return new Configuration();
304    }
305
306    public static final class Configuration {
307        private StanzaFilter packetFilter;
308        private int size = SmackConfiguration.getStanzaCollectorSize();
309        private StanzaCollector collectorToReset;
310
311        private Configuration() {
312        }
313
314        /**
315         * Set the stanza(/packet) filter used by this collector. If <code>null</code>, then all packets will
316         * get collected by this collector.
317         * 
318         * @param packetFilter
319         * @return a reference to this configuration.
320         * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead.
321         */
322        @Deprecated
323        public Configuration setPacketFilter(StanzaFilter packetFilter) {
324            return setStanzaFilter(packetFilter);
325        }
326
327        /**
328         * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will
329         * get collected by this collector.
330         * 
331         * @param stanzaFilter
332         * @return a reference to this configuration.
333         */
334        public Configuration setStanzaFilter(StanzaFilter stanzaFilter) {
335            this.packetFilter = stanzaFilter;
336            return this;
337        }
338
339        /**
340         * Set the maximum size of this collector, i.e. how many stanzas this collector will collect
341         * before dropping old ones.
342         * 
343         * @param size
344         * @return a reference to this configuration.
345         */
346        public Configuration setSize(int size) {
347            this.size = size;
348            return this;
349        }
350
351        /**
352         * Set the collector which timeout for the next result is reset once this collector collects
353         * a packet.
354         * 
355         * @param collector
356         * @return a reference to this configuration.
357         */
358        public Configuration setCollectorToReset(StanzaCollector collector) {
359            this.collectorToReset = collector;
360            return this;
361        }
362    }
363}