001/**
002 *
003 * Copyright © 2014-2015 Florian Schmaus
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack;
018
019import java.util.concurrent.TimeUnit;
020import java.util.concurrent.locks.Condition;
021import java.util.concurrent.locks.Lock;
022
023import org.jivesoftware.smack.SmackException.NoResponseException;
024import org.jivesoftware.smack.SmackException.NotConnectedException;
025import org.jivesoftware.smack.packet.Nonza;
026import org.jivesoftware.smack.packet.Stanza;
027import org.jivesoftware.smack.packet.TopLevelStreamElement;
028
029public class SynchronizationPoint<E extends Exception> {
030
031    private final AbstractXMPPConnection connection;
032    private final Lock connectionLock;
033    private final Condition condition;
034    private final String waitFor;
035
036    // Note that there is no need to make 'state' and 'failureException' volatile. Since 'lock' and 'unlock' have the
037    // same memory synchronization effects as synchronization block enter and leave.
038    private State state;
039    private E failureException;
040
041    /**
042     * Construct a new synchronization point for the given connection.
043     *
044     * @param connection the connection of this synchronization point.
045     * @param waitFor a description of the event this synchronization point handles.
046     */
047    public SynchronizationPoint(AbstractXMPPConnection connection, String waitFor) {
048        this.connection = connection;
049        this.connectionLock = connection.getConnectionLock();
050        this.condition = connection.getConnectionLock().newCondition();
051        this.waitFor = waitFor;
052        init();
053    }
054
055    /**
056     * Initialize (or reset) this synchronization point.
057     */
058    public void init() {
059        connectionLock.lock();
060        state = State.Initial;
061        failureException = null;
062        connectionLock.unlock();
063    }
064
065    /**
066     * Send the given top level stream element and wait for a response.
067     *
068     * @param request the plain stream element to send.
069     * @throws NoResponseException if no response was received.
070     * @throws NotConnectedException if the connection is not connected.
071     * @return <code>null</code> if synchronization point was successful, or the failure Exception.
072     */
073    public E sendAndWaitForResponse(TopLevelStreamElement request) throws NoResponseException,
074                    NotConnectedException, InterruptedException {
075        assert (state == State.Initial);
076        connectionLock.lock();
077        try {
078            if (request != null) {
079                if (request instanceof Stanza) {
080                    connection.sendStanza((Stanza) request);
081                }
082                else if (request instanceof Nonza) {
083                    connection.sendNonza((Nonza) request);
084                } else {
085                    throw new IllegalStateException("Unsupported element type");
086                }
087                state = State.RequestSent;
088            }
089            waitForConditionOrTimeout();
090        }
091        finally {
092            connectionLock.unlock();
093        }
094        return checkForResponse();
095    }
096
097    /**
098     * Send the given plain stream element and wait for a response.
099     *
100     * @param request the plain stream element to send.
101     * @throws E if an failure was reported.
102     * @throws NoResponseException if no response was received.
103     * @throws NotConnectedException if the connection is not connected.
104     */
105    public void sendAndWaitForResponseOrThrow(Nonza request) throws E, NoResponseException,
106                    NotConnectedException, InterruptedException {
107        sendAndWaitForResponse(request);
108        switch (state) {
109        case Failure:
110            if (failureException != null) {
111                throw failureException;
112            }
113            break;
114        default:
115            // Success, do nothing
116        }
117    }
118
119    /**
120     * Check if this synchronization point is successful or wait the connections reply timeout.
121     * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
122     * @throws E if there was a failure
123     * @throws InterruptedException 
124     */
125    public void checkIfSuccessOrWaitOrThrow() throws NoResponseException, E, InterruptedException {
126        checkIfSuccessOrWait();
127        if (state == State.Failure) {
128            throw failureException;
129        }
130    }
131
132    /**
133     * Check if this synchronization point is successful or wait the connections reply timeout.
134     * @throws NoResponseException if there was no response marking the synchronization point as success or failed.
135     * @throws InterruptedException
136     * @return <code>null</code> if synchronization point was successful, or the failure Exception.
137     */
138    public E checkIfSuccessOrWait() throws NoResponseException, InterruptedException {
139        connectionLock.lock();
140        try {
141            switch (state) {
142            // Return immediately on success or failure
143            case Success:
144                return null;
145            case Failure:
146                return failureException;
147            default:
148                // Do nothing
149                break;
150            }
151            waitForConditionOrTimeout();
152        } finally {
153            connectionLock.unlock();
154        }
155        return checkForResponse();
156    }
157
158    /**
159     * Report this synchronization point as successful.
160     */
161    public void reportSuccess() {
162        connectionLock.lock();
163        try {
164            state = State.Success;
165            condition.signalAll();
166        }
167        finally {
168            connectionLock.unlock();
169        }
170    }
171
172    /**
173     * Deprecated.
174     * @deprecated use {@link #reportFailure(Exception)} instead.
175     */
176    @Deprecated
177    public void reportFailure() {
178        reportFailure(null);
179    }
180
181    /**
182     * Report this synchronization point as failed because of the given exception. The {@code failureException} must be set.
183     *
184     * @param failureException the exception causing this synchronization point to fail.
185     */
186    public void reportFailure(E failureException) {
187        assert failureException != null;
188        connectionLock.lock();
189        try {
190            state = State.Failure;
191            this.failureException = failureException;
192            condition.signalAll();
193        }
194        finally {
195            connectionLock.unlock();
196        }
197    }
198
199    /**
200     * Check if this synchronization point was successful.
201     *
202     * @return true if the synchronization point was successful, false otherwise.
203     */
204    public boolean wasSuccessful() {
205        connectionLock.lock();
206        try {
207            return state == State.Success;
208        }
209        finally {
210            connectionLock.unlock();
211        }
212    }
213
214    /**
215     * Check if this synchronization point has its request already sent.
216     *
217     * @return true if the request was already sent, false otherwise.
218     */
219    public boolean requestSent() {
220        connectionLock.lock();
221        try {
222            return state == State.RequestSent;
223        }
224        finally {
225            connectionLock.unlock();
226        }
227    }
228
229    public E getFailureException() {
230        connectionLock.lock();
231        try {
232            return failureException;
233        }
234        finally {
235            connectionLock.unlock();
236        }
237    }
238
239    /**
240     * Wait for the condition to become something else as {@link State#RequestSent} or {@link State#Initial}.
241     * {@link #reportSuccess()}, {@link #reportFailure()} and {@link #reportFailure(Exception)} will either set this
242     * synchronization point to {@link State#Success} or {@link State#Failure}. If none of them is set after the
243     * connections reply timeout, this method will set the state of {@link State#NoResponse}.
244     * @throws InterruptedException 
245     */
246    private void waitForConditionOrTimeout() throws InterruptedException {
247        long remainingWait = TimeUnit.MILLISECONDS.toNanos(connection.getReplyTimeout());
248        while (state == State.RequestSent || state == State.Initial) {
249            if (remainingWait <= 0) {
250                state = State.NoResponse;
251                break;
252            }
253            remainingWait = condition.awaitNanos(remainingWait);
254        }
255    }
256
257    /**
258     * Check for a response and throw a {@link NoResponseException} if there was none.
259     * <p>
260     * The exception is thrown, if state is one of 'Initial', 'NoResponse' or 'RequestSent'
261     * </p>
262     * @return <code>true</code> if synchronization point was successful, <code>false</code> on failure.
263     * @throws NoResponseException
264     */
265    private E checkForResponse() throws NoResponseException {
266        switch (state) {
267        case Initial:
268        case NoResponse:
269        case RequestSent:
270            throw NoResponseException.newWith(connection, waitFor);
271        case Success:
272            return null;
273        case Failure:
274            return failureException;
275        default:
276            throw new AssertionError("Unknown state " + state);
277        }
278    }
279
280    private enum State {
281        Initial,
282        RequestSent,
283        NoResponse,
284        Success,
285        Failure,
286    }
287}