001/** 002 * 003 * Copyright © 2014-2019 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.locks.Condition; 021import java.util.concurrent.locks.Lock; 022 023import org.jivesoftware.smack.SmackException.NoResponseException; 024import org.jivesoftware.smack.SmackException.NotConnectedException; 025import org.jivesoftware.smack.SmackException.SmackWrappedException; 026import org.jivesoftware.smack.packet.Nonza; 027import org.jivesoftware.smack.packet.Stanza; 028import org.jivesoftware.smack.packet.TopLevelStreamElement; 029 030public class SynchronizationPoint<E extends Exception> { 031 032 private final AbstractXMPPConnection connection; 033 private final Lock connectionLock; 034 private final Condition condition; 035 private final String waitFor; 036 037 // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the 038 // same memory synchronization effects as synchronization block enter and leave. 039 private State state; 040 private E failureException; 041 private SmackWrappedException smackWrappedExcpetion; 042 043 /** 044 * Construct a new synchronization point for the given connection. 045 * 046 * @param connection the connection of this synchronization point. 047 * @param waitFor a description of the event this synchronization point handles. 048 */ 049 public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) { 050 this.connection = connection; 051 this.connectionLock = connection.getConnectionLock(); 052 this.condition = connection.getConnectionLock().newCondition(); 053 this.waitFor = waitFor; 054 init(); 055 } 056 057 /** 058 * Initialize (or reset) this synchronization point. 059 */ 060 public void init() { 061 connectionLock.lock(); 062 state = State.Initial; 063 failureException = null; 064 smackWrappedExcpetion = null; 065 connectionLock.unlock(); 066 } 067 068 /** 069 * Send the given top level stream element and wait for a response. 070 * 071 * @param request the plain stream element to send. 072 * @throws NoResponseException if no response was received. 073 * @throws NotConnectedException if the connection is not connected. 074 * @throws InterruptedException if the connection is interrupted. 075 * @return <code>null</code> if synchronization point was successful, or the failure Exception. 076 */ 077 public Exception sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, 078 NotConnectedException, InterruptedException { 079 assert (state == State.Initial); 080 connectionLock.lock(); 081 try { 082 if (request != null) { 083 if (request instanceof Stanza) { 084 connection.sendStanza((Stanza) request); 085 } 086 else if (request instanceof Nonza) { 087 connection.sendNonza((Nonza) request); 088 } else { 089 throw new IllegalStateException("Unsupported element type"); 090 } 091 state = State.RequestSent; 092 } 093 waitForConditionOrTimeout(); 094 } 095 finally { 096 connectionLock.unlock(); 097 } 098 return checkForResponse(); 099 } 100 101 /** 102 * Send the given plain stream element and wait for a response. 103 * 104 * @param request the plain stream element to send. 105 * @throws E if an failure was reported. 106 * @throws NoResponseException if no response was received. 107 * @throws NotConnectedException if the connection is not connected. 108 * @throws InterruptedException if the connection is interrupted. 109 * @throws SmackWrappedException in case of a wrapped exception; 110 */ 111 public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, 112 NotConnectedException, InterruptedException, SmackWrappedException { 113 sendAndWaitForResponse(request); 114 switch (state) { 115 case Failure: 116 throwException(); 117 break; 118 default: 119 // Success, do nothing 120 } 121 } 122 123 /** 124 * Check if this synchronization point is successful or wait the connections reply timeout. 125 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 126 * @throws E if there was a failure 127 * @throws InterruptedException if the connection is interrupted. 128 * @throws SmackWrappedException in case of a wrapped exception; 129 */ 130 public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException, SmackWrappedException { 131 checkIfSuccessOrWait(); 132 if (state == State.Failure) { 133 throwException(); 134 } 135 } 136 137 /** 138 * Check if this synchronization point is successful or wait the connections reply timeout. 139 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 140 * @throws InterruptedException 141 * @return <code>null</code> if synchronization point was successful, or the failure Exception. 142 */ 143 public Exception checkIfSuccessOrWait() throws NoResponseException, InterruptedException { 144 connectionLock.lock(); 145 try { 146 switch (state) { 147 // Return immediately on success or failure 148 case Success: 149 return null; 150 case Failure: 151 return getException(); 152 default: 153 // Do nothing 154 break; 155 } 156 waitForConditionOrTimeout(); 157 } finally { 158 connectionLock.unlock(); 159 } 160 return checkForResponse(); 161 } 162 163 /** 164 * Report this synchronization point as successful. 165 */ 166 public void reportSuccess() { 167 connectionLock.lock(); 168 try { 169 state = State.Success; 170 condition.signalAll(); 171 } 172 finally { 173 connectionLock.unlock(); 174 } 175 } 176 177 /** 178 * Deprecated. 179 * @deprecated use {@link #reportFailure(Exception)} instead. 180 */ 181 @Deprecated 182 public void reportFailure() { 183 reportFailure(null); 184 } 185 186 /** 187 * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. 188 * 189 * @param failureException the exception causing this synchronization point to fail. 190 */ 191 public void reportFailure(E failureException) { 192 assert failureException != null; 193 connectionLock.lock(); 194 try { 195 state = State.Failure; 196 this.failureException = failureException; 197 condition.signalAll(); 198 } 199 finally { 200 connectionLock.unlock(); 201 } 202 } 203 204 /** 205 * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. 206 * 207 * @param exception the exception causing this synchronization point to fail. 208 */ 209 public void reportGenericFailure(SmackWrappedException exception) { 210 assert exception != null; 211 connectionLock.lock(); 212 try { 213 state = State.Failure; 214 this.smackWrappedExcpetion = exception; 215 condition.signalAll(); 216 } 217 finally { 218 connectionLock.unlock(); 219 } 220 } 221 222 /** 223 * Check if this synchronization point was successful. 224 * 225 * @return true if the synchronization point was successful, false otherwise. 226 */ 227 public boolean wasSuccessful() { 228 connectionLock.lock(); 229 try { 230 return state == State.Success; 231 } 232 finally { 233 connectionLock.unlock(); 234 } 235 } 236 237 public boolean isNotInInitialState() { 238 connectionLock.lock(); 239 try { 240 return state != State.Initial; 241 } 242 finally { 243 connectionLock.unlock(); 244 } 245 } 246 247 /** 248 * Check if this synchronization point has its request already sent. 249 * 250 * @return true if the request was already sent, false otherwise. 251 */ 252 public boolean requestSent() { 253 connectionLock.lock(); 254 try { 255 return state == State.RequestSent; 256 } 257 finally { 258 connectionLock.unlock(); 259 } 260 } 261 262 public E getFailureException() { 263 connectionLock.lock(); 264 try { 265 return failureException; 266 } 267 finally { 268 connectionLock.unlock(); 269 } 270 } 271 272 /** 273 * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}. 274 * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this 275 * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the 276 * connections reply timeout, this method will set the state of {@link State#NoResponse}. 277 * @throws InterruptedException 278 */ 279 private void waitForConditionOrTimeout() throws InterruptedException { 280 long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getReplyTimeout()); 281 while (state == State.RequestSent || state == State.Initial) { 282 if (remainingWait <= 0) { 283 state = State.NoResponse; 284 break; 285 } 286 remainingWait = condition.awaitNanos(remainingWait); 287 } 288 } 289 290 private Exception getException() { 291 if (failureException != null) { 292 return failureException; 293 } 294 return smackWrappedExcpetion; 295 } 296 297 private void throwException() throws E, SmackWrappedException { 298 if (failureException != null) { 299 throw failureException; 300 } 301 throw smackWrappedExcpetion; 302 } 303 304 /** 305 * Check for a response and throw a {@link NoResponseException} if there was none. 306 * <p> 307 * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent' 308 * </p> 309 * @return <code>true</code> if synchronization point was successful, <code>false</code> on failure. 310 * @throws NoResponseException 311 */ 312 private Exception checkForResponse() throws NoResponseException { 313 switch (state) { 314 case Initial: 315 case NoResponse: 316 case RequestSent: 317 throw NoResponseException.newWith(connection, waitFor); 318 case Success: 319 return null; 320 case Failure: 321 return getException(); 322 default: 323 throw new AssertionError("Unknown state " + state); 324 } 325 } 326 327 private enum State { 328 Initial, 329 RequestSent, 330 NoResponse, 331 Success, 332 Failure, 333 } 334}