001/** 002 * 003 * Copyright © 2014-2015 Florian Schmaus 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.util.concurrent.TimeUnit; 020import java.util.concurrent.locks.Condition; 021import java.util.concurrent.locks.Lock; 022 023import org.jivesoftware.smack.SmackException.NoResponseException; 024import org.jivesoftware.smack.SmackException.NotConnectedException; 025import org.jivesoftware.smack.packet.TopLevelStreamElement; 026import org.jivesoftware.smack.packet.Stanza; 027import org.jivesoftware.smack.packet.Nonza; 028 029public class SynchronizationPoint<E extends Exception> { 030 031 private final AbstractXMPPConnection connection; 032 private final Lock connectionLock; 033 private final Condition condition; 034 private final String waitFor; 035 036 // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the 037 // same memory synchronization effects as synchronization block enter and leave. 038 private State state; 039 private E failureException; 040 041 /** 042 * Construct a new synchronization point for the given connection. 043 * 044 * @param connection the connection of this synchronization point. 045 * @param waitFor a description of the event this synchronization point handles. 046 */ 047 public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) { 048 this.connection = connection; 049 this.connectionLock = connection.getConnectionLock(); 050 this.condition = connection.getConnectionLock().newCondition(); 051 this.waitFor = waitFor; 052 init(); 053 } 054 055 /** 056 * Initialize (or reset) this synchronization point. 057 */ 058 public void init() { 059 connectionLock.lock(); 060 state = State.Initial; 061 failureException = null; 062 connectionLock.unlock(); 063 } 064 065 /** 066 * Send the given top level stream element and wait for a response. 067 * 068 * @param request the plain stream element to send. 069 * @throws NoResponseException if no response was received. 070 * @throws NotConnectedException if the connection is not connected. 071 * @return <code>null</code> if synchronization point was successful, or the failure Exception. 072 */ 073 public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException, 074 NotConnectedException, InterruptedException { 075 assert (state == State.Initial); 076 connectionLock.lock(); 077 try { 078 if (request != null) { 079 if (request instanceof Stanza) { 080 connection.sendStanza((Stanza) request); 081 } 082 else if (request instanceof Nonza){ 083 connection.sendNonza((Nonza) request); 084 } else { 085 throw new IllegalStateException("Unsupported element type"); 086 } 087 state = State.RequestSent; 088 } 089 waitForConditionOrTimeout(); 090 } 091 finally { 092 connectionLock.unlock(); 093 } 094 return checkForResponse(); 095 } 096 097 /** 098 * Send the given plain stream element and wait for a response. 099 * 100 * @param request the plain stream element to send. 101 * @throws E if an failure was reported. 102 * @throws NoResponseException if no response was received. 103 * @throws NotConnectedException if the connection is not connected. 104 */ 105 public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException, 106 NotConnectedException, InterruptedException { 107 sendAndWaitForResponse(request); 108 switch (state) { 109 case Failure: 110 if (failureException != null) { 111 throw failureException; 112 } 113 break; 114 default: 115 // Success, do nothing 116 } 117 } 118 119 /** 120 * Check if this synchronization point is successful or wait the connections reply timeout. 121 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 122 * @throws E if there was a failure 123 * @throws InterruptedException 124 */ 125 public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException { 126 checkIfSuccessOrWait(); 127 if (state == State.Failure) { 128 throw failureException; 129 } 130 } 131 132 /** 133 * Check if this synchronization point is successful or wait the connections reply timeout. 134 * @throws NoResponseException if there was no response marking the synchronization point as success or failed. 135 * @throws InterruptedException 136 * @return <code>null</code> if synchronization point was successful, or the failure Exception. 137 */ 138 public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException { 139 connectionLock.lock(); 140 try { 141 switch (state) { 142 // Return immediately on success or failure 143 case Success: 144 return null; 145 case Failure: 146 return failureException; 147 default: 148 // Do nothing 149 break; 150 } 151 waitForConditionOrTimeout(); 152 } finally { 153 connectionLock.unlock(); 154 } 155 return checkForResponse(); 156 } 157 158 /** 159 * Report this synchronization point as successful. 160 */ 161 public void reportSuccess() { 162 connectionLock.lock(); 163 try { 164 state = State.Success; 165 condition.signalAll(); 166 } 167 finally { 168 connectionLock.unlock(); 169 } 170 } 171 172 /** 173 * Deprecated. 174 * @deprecated use {@link #reportFailure(Exception)} instead. 175 */ 176 @Deprecated 177 public void reportFailure() { 178 reportFailure(null); 179 } 180 181 /** 182 * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set. 183 * 184 * @param failureException the exception causing this synchronization point to fail. 185 */ 186 public void reportFailure(E failureException) { 187 assert failureException != null; 188 connectionLock.lock(); 189 try { 190 state = State.Failure; 191 this.failureException = failureException; 192 condition.signalAll(); 193 } 194 finally { 195 connectionLock.unlock(); 196 } 197 } 198 199 /** 200 * Check if this synchronization point was successful. 201 * 202 * @return true if the synchronization point was successful, false otherwise. 203 */ 204 public boolean wasSuccessful() { 205 connectionLock.lock(); 206 try { 207 return state == State.Success; 208 } 209 finally { 210 connectionLock.unlock(); 211 } 212 } 213 214 /** 215 * Check if this synchronization point has its request already sent. 216 * 217 * @return true if the request was already sent, false otherwise. 218 */ 219 public boolean requestSent() { 220 connectionLock.lock(); 221 try { 222 return state == State.RequestSent; 223 } 224 finally { 225 connectionLock.unlock(); 226 } 227 } 228 229 /** 230 * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}. 231 * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this 232 * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the 233 * connections reply timeout, this method will set the state of {@link State#NoResponse}. 234 * @throws InterruptedException 235 */ 236 private void waitForConditionOrTimeout() throws InterruptedException { 237 long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getReplyTimeout()); 238 while (state == State.RequestSent || state == State.Initial) { 239 if (remainingWait <= 0) { 240 state = State.NoResponse; 241 break; 242 } 243 remainingWait = condition.awaitNanos(remainingWait); 244 } 245 } 246 247 /** 248 * Check for a response and throw a {@link NoResponseException} if there was none. 249 * <p> 250 * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent' 251 * </p> 252 * @return <code>true</code> if synchronization point was successful, <code>false</code> on failure. 253 * @throws NoResponseException 254 */ 255 private E checkForResponse() throws NoResponseException { 256 switch (state) { 257 case Initial: 258 case NoResponse: 259 case RequestSent: 260 throw NoResponseException.newWith(connection, waitFor); 261 case Success: 262 return null; 263 case Failure: 264 return failureException; 265 default: 266 throw new AssertionError("Unknown state " + state); 267 } 268 } 269 270 private enum State { 271 Initial, 272 RequestSent, 273 NoResponse, 274 Success, 275 Failure, 276 } 277}