001/** 002 * 003 * Copyright 2003-2007 Jive Software, 2016-2017 Florian Schmaus. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack; 019 020import java.util.concurrent.ArrayBlockingQueue; 021import java.util.concurrent.TimeUnit; 022 023import org.jivesoftware.smack.SmackException.NotConnectedException; 024import org.jivesoftware.smack.SmackException.NoResponseException; 025import org.jivesoftware.smack.XMPPException.XMPPErrorException; 026import org.jivesoftware.smack.filter.StanzaFilter; 027import org.jivesoftware.smack.packet.Stanza; 028 029/** 030 * Provides a mechanism to collect Stanzas into a result queue that pass a 031 * specified filter/matcher. The collector lets you perform blocking and polling 032 * operations on the result queue. So, a StanzaCollector is more suitable to 033 * use than a {@link StanzaListener} when you need to wait for a specific 034 * result.<p> 035 * 036 * Each stanza(/packet) collector will queue up a configured number of packets for processing before 037 * older packets are automatically dropped. The default number is retrieved by 038 * {@link SmackConfiguration#getStanzaCollectorSize()}. 039 * 040 * @see XMPPConnection#createStanzaCollector(StanzaFilter) 041 * @author Matt Tucker 042 */ 043public class StanzaCollector { 044 045 private final StanzaFilter packetFilter; 046 047 private final ArrayBlockingQueue<Stanza> resultQueue; 048 049 /** 050 * The stanza(/packet) collector which timeout for the next result will get reset once this collector collects a stanza. 051 */ 052 private final StanzaCollector collectorToReset; 053 054 private final XMPPConnection connection; 055 056 private boolean cancelled = false; 057 058 /** 059 * Creates a new stanza(/packet) collector. If the stanza(/packet) filter is <tt>null</tt>, then 060 * all packets will match this collector. 061 * 062 * @param connection the connection the collector is tied to. 063 * @param configuration the configuration used to construct this collector 064 */ 065 protected StanzaCollector(XMPPConnection connection, Configuration configuration) { 066 this.connection = connection; 067 this.packetFilter = configuration.packetFilter; 068 this.resultQueue = new ArrayBlockingQueue<>(configuration.size); 069 this.collectorToReset = configuration.collectorToReset; 070 } 071 072 /** 073 * Explicitly cancels the stanza(/packet) collector so that no more results are 074 * queued up. Once a stanza(/packet) collector has been cancelled, it cannot be 075 * re-enabled. Instead, a new stanza(/packet) collector must be created. 076 */ 077 public void cancel() { 078 // If the packet collector has already been cancelled, do nothing. 079 if (!cancelled) { 080 cancelled = true; 081 connection.removeStanzaCollector(this); 082 } 083 } 084 085 /** 086 * Returns the stanza(/packet) filter associated with this stanza(/packet) collector. The packet 087 * filter is used to determine what packets are queued as results. 088 * 089 * @return the stanza(/packet) filter. 090 * @deprecated use {@link #getStanzaFilter()} instead. 091 */ 092 @Deprecated 093 public StanzaFilter getPacketFilter() { 094 return getStanzaFilter(); 095 } 096 097 /** 098 * Returns the stanza filter associated with this stanza collector. The stanza 099 * filter is used to determine what stanzas are queued as results. 100 * 101 * @return the stanza filter. 102 */ 103 public StanzaFilter getStanzaFilter() { 104 return packetFilter; 105 } 106 107 /** 108 * Polls to see if a stanza(/packet) is currently available and returns it, or 109 * immediately returns <tt>null</tt> if no packets are currently in the 110 * result queue. 111 * 112 * @return the next stanza(/packet) result, or <tt>null</tt> if there are no more 113 * results. 114 */ 115 @SuppressWarnings("unchecked") 116 public <P extends Stanza> P pollResult() { 117 return (P) resultQueue.poll(); 118 } 119 120 /** 121 * Polls to see if a stanza(/packet) is currently available and returns it, or 122 * immediately returns <tt>null</tt> if no packets are currently in the 123 * result queue. 124 * <p> 125 * Throws an XMPPErrorException in case the polled stanzas did contain an XMPPError. 126 * </p> 127 * 128 * @return the next available packet. 129 * @throws XMPPErrorException in case an error response. 130 */ 131 public <P extends Stanza> P pollResultOrThrow() throws XMPPErrorException { 132 P result = pollResult(); 133 if (result != null) { 134 XMPPErrorException.ifHasErrorThenThrow(result); 135 } 136 return result; 137 } 138 139 /** 140 * Returns the next available packet. The method call will block (not return) until a stanza(/packet) is 141 * available. 142 * 143 * @return the next available packet. 144 * @throws InterruptedException 145 */ 146 @SuppressWarnings("unchecked") 147 public <P extends Stanza> P nextResultBlockForever() throws InterruptedException { 148 throwIfCancelled(); 149 P res = null; 150 while (res == null) { 151 res = (P) resultQueue.take(); 152 } 153 return res; 154 } 155 156 /** 157 * Returns the next available packet. The method call will block until the connection's default 158 * timeout has elapsed. 159 * 160 * @return the next available packet. 161 * @throws InterruptedException 162 */ 163 public <P extends Stanza> P nextResult() throws InterruptedException { 164 return nextResult(connection.getReplyTimeout()); 165 } 166 167 private volatile long waitStart; 168 169 /** 170 * Returns the next available packet. The method call will block (not return) 171 * until a stanza(/packet) is available or the <tt>timeout</tt> has elapsed. If the 172 * timeout elapses without a result, <tt>null</tt> will be returned. 173 * 174 * @param timeout the timeout in milliseconds. 175 * @return the next available packet. 176 * @throws InterruptedException 177 */ 178 @SuppressWarnings("unchecked") 179 public <P extends Stanza> P nextResult(long timeout) throws InterruptedException { 180 throwIfCancelled(); 181 P res = null; 182 long remainingWait = timeout; 183 waitStart = System.currentTimeMillis(); 184 do { 185 res = (P) resultQueue.poll(remainingWait, TimeUnit.MILLISECONDS); 186 if (res != null) { 187 return res; 188 } 189 remainingWait = timeout - (System.currentTimeMillis() - waitStart); 190 } while (remainingWait > 0); 191 return null; 192 } 193 194 /** 195 * Returns the next available stanza. The method in equivalent to 196 * {@link #nextResultOrThrow(long)} where the timeout argument is the default reply timeout of 197 * the connection associated with this collector. 198 * 199 * @return the next available stanza. 200 * @throws XMPPErrorException in case an error response was received. 201 * @throws NoResponseException if there was no response from the server. 202 * @throws InterruptedException 203 * @throws NotConnectedException 204 * @see #nextResultOrThrow(long) 205 */ 206 public <P extends Stanza> P nextResultOrThrow() throws NoResponseException, XMPPErrorException, 207 InterruptedException, NotConnectedException { 208 return nextResultOrThrow(connection.getReplyTimeout()); 209 } 210 211 /** 212 * Returns the next available stanza. The method call will block until a stanza is 213 * available or the <tt>timeout</tt> has elapsed. This method does also cancel the 214 * collector in every case. 215 * <p> 216 * Three things can happen when waiting for an response: 217 * </p> 218 * <ol> 219 * <li>A result response arrives.</li> 220 * <li>An error response arrives.</li> 221 * <li>An timeout occurs.</li> 222 * <li>The thread is interrupted</li> 223 * </ol> 224 * <p> 225 * in which this method will 226 * </p> 227 * <ol> 228 * <li>return with the result.</li> 229 * <li>throw an {@link XMPPErrorException}.</li> 230 * <li>throw an {@link NoResponseException}.</li> 231 * <li>throw an {@link InterruptedException}.</li> 232 * </ol> 233 * <p> 234 * Additionally the method will throw a {@link NotConnectedException} if no response was 235 * received and the connection got disconnected. 236 * </p> 237 * 238 * @param timeout the amount of time to wait for the next stanza in milliseconds. 239 * @return the next available stanza. 240 * @throws NoResponseException if there was no response from the server. 241 * @throws XMPPErrorException in case an error response was received. 242 * @throws InterruptedException if the calling thread was interrupted. 243 * @throws NotConnectedException if there was no response and the connection got disconnected. 244 */ 245 public <P extends Stanza> P nextResultOrThrow(long timeout) throws NoResponseException, 246 XMPPErrorException, InterruptedException, NotConnectedException { 247 P result = nextResult(timeout); 248 cancel(); 249 if (result == null) { 250 if (!connection.isConnected()) { 251 throw new NotConnectedException(connection, packetFilter); 252 } 253 throw NoResponseException.newWith(connection, this); 254 } 255 256 XMPPErrorException.ifHasErrorThenThrow(result); 257 258 return result; 259 } 260 261 /** 262 * Get the number of collected stanzas this stanza(/packet) collector has collected so far. 263 * 264 * @return the count of collected stanzas. 265 * @since 4.1 266 */ 267 public int getCollectedCount() { 268 return resultQueue.size(); 269 } 270 271 /** 272 * Processes a stanza(/packet) to see if it meets the criteria for this stanza(/packet) collector. 273 * If so, the stanza(/packet) is added to the result queue. 274 * 275 * @param packet the stanza(/packet) to process. 276 */ 277 protected void processStanza(Stanza packet) { 278 if (packetFilter == null || packetFilter.accept(packet)) { 279 // CHECKSTYLE:OFF 280 while (!resultQueue.offer(packet)) { 281 // Since we know the queue is full, this poll should never actually block. 282 resultQueue.poll(); 283 } 284 // CHECKSTYLE:ON 285 if (collectorToReset != null) { 286 collectorToReset.waitStart = System.currentTimeMillis(); 287 } 288 } 289 } 290 291 private final void throwIfCancelled() { 292 if (cancelled) { 293 throw new IllegalStateException("Packet collector already cancelled"); 294 } 295 } 296 297 /** 298 * Get a new stanza(/packet) collector configuration instance. 299 * 300 * @return a new stanza(/packet) collector configuration. 301 */ 302 public static Configuration newConfiguration() { 303 return new Configuration(); 304 } 305 306 public static final class Configuration { 307 private StanzaFilter packetFilter; 308 private int size = SmackConfiguration.getStanzaCollectorSize(); 309 private StanzaCollector collectorToReset; 310 311 private Configuration() { 312 } 313 314 /** 315 * Set the stanza(/packet) filter used by this collector. If <code>null</code>, then all packets will 316 * get collected by this collector. 317 * 318 * @param packetFilter 319 * @return a reference to this configuration. 320 * @deprecated use {@link #setStanzaFilter(StanzaFilter)} instead. 321 */ 322 @Deprecated 323 public Configuration setPacketFilter(StanzaFilter packetFilter) { 324 return setStanzaFilter(packetFilter); 325 } 326 327 /** 328 * Set the stanza filter used by this collector. If <code>null</code>, then all stanzas will 329 * get collected by this collector. 330 * 331 * @param stanzaFilter 332 * @return a reference to this configuration. 333 */ 334 public Configuration setStanzaFilter(StanzaFilter stanzaFilter) { 335 this.packetFilter = stanzaFilter; 336 return this; 337 } 338 339 /** 340 * Set the maximum size of this collector, i.e. how many stanzas this collector will collect 341 * before dropping old ones. 342 * 343 * @param size 344 * @return a reference to this configuration. 345 */ 346 public Configuration setSize(int size) { 347 this.size = size; 348 return this; 349 } 350 351 /** 352 * Set the collector which timeout for the next result is reset once this collector collects 353 * a packet. 354 * 355 * @param collector 356 * @return a reference to this configuration. 357 */ 358 public Configuration setCollectorToReset(StanzaCollector collector) { 359 this.collectorToReset = collector; 360 return this; 361 } 362 } 363}