001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.ByteArrayInputStream;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.OutputStream;
026import java.io.OutputStreamWriter;
027import java.io.Writer;
028import java.lang.reflect.Constructor;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.net.Socket;
032import java.security.KeyManagementException;
033import java.security.KeyStore;
034import java.security.KeyStoreException;
035import java.security.NoSuchAlgorithmException;
036import java.security.NoSuchProviderException;
037import java.security.Provider;
038import java.security.SecureRandom;
039import java.security.Security;
040import java.security.UnrecoverableKeyException;
041import java.security.cert.CertificateException;
042import java.util.ArrayList;
043import java.util.Collection;
044import java.util.Iterator;
045import java.util.LinkedHashSet;
046import java.util.LinkedList;
047import java.util.List;
048import java.util.Map;
049import java.util.Set;
050import java.util.concurrent.ArrayBlockingQueue;
051import java.util.concurrent.BlockingQueue;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentLinkedQueue;
054import java.util.concurrent.Semaphore;
055import java.util.concurrent.TimeUnit;
056import java.util.concurrent.atomic.AtomicBoolean;
057import java.util.logging.Level;
058import java.util.logging.Logger;
059
060import javax.net.SocketFactory;
061import javax.net.ssl.HostnameVerifier;
062import javax.net.ssl.KeyManager;
063import javax.net.ssl.KeyManagerFactory;
064import javax.net.ssl.SSLContext;
065import javax.net.ssl.SSLSession;
066import javax.net.ssl.SSLSocket;
067import javax.net.ssl.TrustManager;
068import javax.net.ssl.X509TrustManager;
069import javax.security.auth.callback.Callback;
070import javax.security.auth.callback.CallbackHandler;
071import javax.security.auth.callback.PasswordCallback;
072
073import org.jivesoftware.smack.AbstractConnectionListener;
074import org.jivesoftware.smack.AbstractXMPPConnection;
075import org.jivesoftware.smack.ConnectionConfiguration;
076import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode;
077import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
078import org.jivesoftware.smack.SmackConfiguration;
079import org.jivesoftware.smack.SmackException;
080import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
081import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
082import org.jivesoftware.smack.SmackException.ConnectionException;
083import org.jivesoftware.smack.SmackException.NoResponseException;
084import org.jivesoftware.smack.SmackException.NotConnectedException;
085import org.jivesoftware.smack.SmackException.NotLoggedInException;
086import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
087import org.jivesoftware.smack.SmackException.SmackWrappedException;
088import org.jivesoftware.smack.SmackFuture;
089import org.jivesoftware.smack.StanzaListener;
090import org.jivesoftware.smack.SynchronizationPoint;
091import org.jivesoftware.smack.XMPPConnection;
092import org.jivesoftware.smack.XMPPException;
093import org.jivesoftware.smack.XMPPException.FailedNonzaException;
094import org.jivesoftware.smack.XMPPException.StreamErrorException;
095import org.jivesoftware.smack.compress.packet.Compress;
096import org.jivesoftware.smack.compress.packet.Compressed;
097import org.jivesoftware.smack.compression.XMPPInputOutputStream;
098import org.jivesoftware.smack.filter.StanzaFilter;
099import org.jivesoftware.smack.packet.Element;
100import org.jivesoftware.smack.packet.IQ;
101import org.jivesoftware.smack.packet.Message;
102import org.jivesoftware.smack.packet.Nonza;
103import org.jivesoftware.smack.packet.Presence;
104import org.jivesoftware.smack.packet.Stanza;
105import org.jivesoftware.smack.packet.StartTls;
106import org.jivesoftware.smack.packet.StreamError;
107import org.jivesoftware.smack.packet.StreamOpen;
108import org.jivesoftware.smack.proxy.ProxyInfo;
109import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
110import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
111import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
112import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
113import org.jivesoftware.smack.sm.SMUtils;
114import org.jivesoftware.smack.sm.StreamManagementException;
115import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
116import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
117import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
118import org.jivesoftware.smack.sm.packet.StreamManagement;
119import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
120import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
121import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
122import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
123import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
124import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
125import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
126import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
127import org.jivesoftware.smack.sm.predicates.Predicate;
128import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
129import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
130import org.jivesoftware.smack.util.Async;
131import org.jivesoftware.smack.util.DNSUtil;
132import org.jivesoftware.smack.util.PacketParserUtils;
133import org.jivesoftware.smack.util.StringUtils;
134import org.jivesoftware.smack.util.TLSUtils;
135import org.jivesoftware.smack.util.XmlStringBuilder;
136import org.jivesoftware.smack.util.dns.HostAddress;
137import org.jivesoftware.smack.util.dns.SmackDaneProvider;
138import org.jivesoftware.smack.util.dns.SmackDaneVerifier;
139
140import org.jxmpp.jid.impl.JidCreate;
141import org.jxmpp.jid.parts.Resourcepart;
142import org.jxmpp.stringprep.XmppStringprepException;
143import org.jxmpp.util.XmppStringUtils;
144import org.xmlpull.v1.XmlPullParser;
145import org.xmlpull.v1.XmlPullParserException;
146
147/**
148 * Creates a socket connection to an XMPP server. This is the default connection
149 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
150 *
151 * @see XMPPConnection
152 * @author Matt Tucker
153 */
154public class XMPPTCPConnection extends AbstractXMPPConnection {
155
156    private static final int QUEUE_SIZE = 500;
157    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
158
159    /**
160     * The socket which is used for this connection.
161     */
162    private Socket socket;
163
164    /**
165     *
166     */
167    private boolean disconnectedButResumeable = false;
168
169    private SSLSocket secureSocket;
170
171    private final Semaphore readerWriterSemaphore = new Semaphore(2);
172
173    /**
174     * Protected access level because of unit test purposes
175     */
176    protected final PacketWriter packetWriter = new PacketWriter();
177
178    /**
179     * Protected access level because of unit test purposes
180     */
181    protected final PacketReader packetReader = new PacketReader();
182
183    private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>(
184                    this, "initial open stream element send to server");
185
186    /**
187     *
188     */
189    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
190                    this, "stream compression feature");
191
192    /**
193     *
194     */
195    private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
196                    this, "stream compression");
197
198    /**
199     * A synchronization point which is successful if this connection has received the closing
200     * stream element from the remote end-point, i.e. the server.
201     */
202    private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
203                    this, "stream closing element received");
204
205    /**
206     * The default bundle and defer callback, used for new connections.
207     * @see bundleAndDeferCallback
208     */
209    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
210
211    /**
212     * The used bundle and defer callback.
213     * <p>
214     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
215     * having a 'volatile' read within the writer threads loop.
216     * </p>
217     */
218    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
219
220    private static boolean useSmDefault = true;
221
222    private static boolean useSmResumptionDefault = true;
223
224    /**
225     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
226     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
227     * {@link #unacknowledgedStanzas}.
228     */
229    private String smSessionId;
230
231    private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
232                    this, "stream resumed element");
233
234    private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
235                    this, "stream enabled element");
236
237    /**
238     * The client's preferred maximum resumption time in seconds.
239     */
240    private int smClientMaxResumptionTime = -1;
241
242    /**
243     * The server's preferred maximum resumption time in seconds.
244     */
245    private int smServerMaxResumptionTime = -1;
246
247    /**
248     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
249     */
250    private boolean useSm = useSmDefault;
251    private boolean useSmResumption = useSmResumptionDefault;
252
253    /**
254     * The counter that the server sends the client about it's current height. For example, if the server sends
255     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
256     */
257    private long serverHandledStanzasCount = 0;
258
259    /**
260     * The counter for stanzas handled ("received") by the client.
261     * <p>
262     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
263     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
264     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
265     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
266     * </p>
267     */
268    private long clientHandledStanzasCount = 0;
269
270    private BlockingQueue<Stanza> unacknowledgedStanzas;
271
272    /**
273     * Set to true if Stream Management was at least once enabled for this connection.
274     */
275    private boolean smWasEnabledAtLeastOnce = false;
276
277    /**
278     * This listeners are invoked for every stanza that got acknowledged.
279     * <p>
280     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
281     * themselves after they have been invoked.
282     * </p>
283     */
284    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
285
286    /**
287     * These listeners are invoked for every stanza that got dropped.
288     * <p>
289     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
290     * themselves after they have been invoked.
291     * </p>
292     */
293    private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>();
294
295    /**
296     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
297     * only be invoked once and automatically removed after that.
298     */
299    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
300
301    /**
302     * Predicates that determine if an stream management ack should be requested from the server.
303     * <p>
304     * We use a linked hash set here, so that the order how the predicates are added matches the
305     * order in which they are invoked in order to determine if an ack request should be send or not.
306     * </p>
307     */
308    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
309
310    @SuppressWarnings("HidingField")
311    private final XMPPTCPConnectionConfiguration config;
312
313    /**
314     * Creates a new XMPP connection over TCP (optionally using proxies).
315     * <p>
316     * Note that XMPPTCPConnection constructors do not establish a connection to the server
317     * and you must call {@link #connect()}.
318     * </p>
319     *
320     * @param config the connection configuration.
321     */
322    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
323        super(config);
324        this.config = config;
325        addConnectionListener(new AbstractConnectionListener() {
326            @Override
327            public void connectionClosedOnError(Exception e) {
328                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
329                    dropSmState();
330                }
331            }
332        });
333    }
334
335    /**
336     * Creates a new XMPP connection over TCP.
337     * <p>
338     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
339     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
340     * constructor.
341     * </p>
342     *
343     * @param jid the bare JID used by the client.
344     * @param password the password or authentication token.
345     * @throws XmppStringprepException
346     */
347    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
348        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
349    }
350
351    /**
352     * Creates a new XMPP connection over TCP.
353     * <p>
354     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
355     * you can get fine-grained control over connection settings using the
356     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
357     * </p>
358     * @param username
359     * @param password
360     * @param serviceName
361     * @throws XmppStringprepException
362     */
363    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
364        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
365                                        JidCreate.domainBareFrom(serviceName)).build());
366    }
367
368    @Override
369    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
370        if (packetWriter == null) {
371            throw new NotConnectedException();
372        }
373        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
374    }
375
376    @Override
377    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
378        if (isConnected() && !disconnectedButResumeable) {
379            throw new AlreadyConnectedException();
380        }
381    }
382
383    @Override
384    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
385        if (isAuthenticated() && !disconnectedButResumeable) {
386            throw new AlreadyLoggedInException();
387        }
388    }
389
390    @Override
391    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
392        // Reset the flag in case it was set
393        disconnectedButResumeable = false;
394        super.afterSuccessfulLogin(resumed);
395    }
396
397    @Override
398    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
399                    SmackException, IOException, InterruptedException {
400        // Authenticate using SASL
401        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
402        saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
403
404        // Wait for stream features after the authentication.
405        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
406        // renamed to "streamFeaturesAfterAuthenticationReceived".
407        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
408
409        // If compression is enabled then request the server to use stream compression. XEP-170
410        // recommends to perform stream compression before resource binding.
411        maybeEnableCompression();
412
413        if (isSmResumptionPossible()) {
414            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
415            if (smResumedSyncPoint.wasSuccessful()) {
416                // We successfully resumed the stream, be done here
417                afterSuccessfulLogin(true);
418                return;
419            }
420            // SM resumption failed, what Smack does here is to report success of
421            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
422            // normal resource binding can be tried.
423            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
424        }
425
426        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
427        if (unacknowledgedStanzas != null) {
428            // There was a previous connection with SM enabled but that was either not resumable or
429            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
430            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
431            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
432            // XMPP session (There maybe was an enabled in a previous XMPP session of this
433            // connection instance though). This is used in writePackets to decide if stanzas should
434            // be added to the unacknowledged stanzas queue, because they have to be added right
435            // after the 'enable' stream element has been sent.
436            dropSmState();
437        }
438
439        // Now bind the resource. It is important to do this *after* we dropped an eventually
440        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
441        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
442        bindResourceAndEstablishSession(resource);
443
444        if (isSmAvailable() && useSm) {
445            // Remove what is maybe left from previously stream managed sessions
446            serverHandledStanzasCount = 0;
447            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
448            // then this is a non recoverable error and we therefore throw an exception.
449            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
450            synchronized (requestAckPredicates) {
451                if (requestAckPredicates.isEmpty()) {
452                    // Assure that we have at lest one predicate set up that so that we request acks
453                    // for the server and eventually flush some stanzas from the unacknowledged
454                    // stanza queue
455                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
456                }
457            }
458        }
459        // Inform client about failed resumption if possible, resend stanzas otherwise
460        // Process the stanzas synchronously so a client can re-queue them for transmission
461        // before it is informed about connection success
462        if (!stanzaDroppedListeners.isEmpty()) {
463            for (Stanza stanza : previouslyUnackedStanzas) {
464                for (StanzaListener listener : stanzaDroppedListeners) {
465                    try {
466                        listener.processStanza(stanza);
467                    }
468                    catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
469                        LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e);
470                    }
471                }
472            }
473        } else {
474            for (Stanza stanza : previouslyUnackedStanzas) {
475                sendStanzaInternal(stanza);
476            }
477        }
478
479        afterSuccessfulLogin(false);
480    }
481
482    @Override
483    public boolean isSecureConnection() {
484        return secureSocket != null;
485    }
486
487    /**
488     * Shuts the current connection down. After this method returns, the connection must be ready
489     * for re-use by connect.
490     */
491    @Override
492    protected void shutdown() {
493        if (isSmEnabled()) {
494            try {
495                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
496                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
497                sendSmAcknowledgementInternal();
498            } catch (InterruptedException | NotConnectedException e) {
499                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
500            }
501        }
502        shutdown(false);
503    }
504
505    @Override
506    public synchronized void instantShutdown() {
507        shutdown(true);
508    }
509
510    private void shutdown(boolean instant) {
511        if (disconnectedButResumeable) {
512            return;
513        }
514
515        // First shutdown the writer, this will result in a closing stream element getting send to
516        // the server
517        LOGGER.finer("PacketWriter shutdown()");
518        packetWriter.shutdown(instant);
519        LOGGER.finer("PacketWriter has been shut down");
520
521        if (!instant) {
522            try {
523                // After we send the closing stream element, check if there was already a
524                // closing stream element sent by the server or wait with a timeout for a
525                // closing stream element to be received from the server.
526                @SuppressWarnings("unused")
527                Exception res = closingStreamReceived.checkIfSuccessOrWait();
528            } catch (InterruptedException | NoResponseException e) {
529                LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
530            }
531        }
532
533        LOGGER.finer("PacketReader shutdown()");
534        packetReader.shutdown();
535        LOGGER.finer("PacketReader has been shut down");
536
537        try {
538                socket.close();
539        } catch (Exception e) {
540                LOGGER.log(Level.WARNING, "shutdown", e);
541        }
542
543        setWasAuthenticated();
544        // If we are able to resume the stream, then don't set
545        // connected/authenticated/usingTLS to false since we like behave like we are still
546        // connected (e.g. sendStanza should not throw a NotConnectedException).
547        if (isSmResumptionPossible() && instant) {
548            disconnectedButResumeable = true;
549        } else {
550            disconnectedButResumeable = false;
551            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
552            // stream tag, there is no longer a stream to resume.
553            smSessionId = null;
554            // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the
555            // information is available in the connectionClosedOnError() listeners.
556        }
557        authenticated = false;
558        connected = false;
559        secureSocket = null;
560        reader = null;
561        writer = null;
562
563        initState();
564    }
565
566    @Override
567    protected void initState() {
568        super.initState();
569        maybeCompressFeaturesReceived.init();
570        compressSyncPoint.init();
571        smResumedSyncPoint.init();
572        smEnabledSyncPoint.init();
573        initialOpenStreamSend.init();
574    }
575
576    @Override
577    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
578        packetWriter.sendStreamElement(element);
579    }
580
581    @Override
582    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
583        packetWriter.sendStreamElement(packet);
584        if (isSmEnabled()) {
585            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
586                if (requestAckPredicate.accept(packet)) {
587                    requestSmAcknowledgementInternal();
588                    break;
589                }
590            }
591        }
592    }
593
594    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
595        List<HostAddress> failedAddresses = populateHostAddresses();
596        SocketFactory socketFactory = config.getSocketFactory();
597        ProxyInfo proxyInfo = config.getProxyInfo();
598        int timeout = config.getConnectTimeout();
599        if (socketFactory == null) {
600            socketFactory = SocketFactory.getDefault();
601        }
602        for (HostAddress hostAddress : hostAddresses) {
603            Iterator<InetAddress> inetAddresses;
604            String host = hostAddress.getHost();
605            int port = hostAddress.getPort();
606            if (proxyInfo == null) {
607                inetAddresses = hostAddress.getInetAddresses().iterator();
608                assert (inetAddresses.hasNext());
609
610                innerloop: while (inetAddresses.hasNext()) {
611                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
612                    // re-usable after a failed connection attempt. See also SMACK-724.
613                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
614
615                    final InetAddress inetAddress = inetAddresses.next();
616                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
617                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
618                    socketFuture.connectAsync(inetSocketAddress, timeout);
619
620                    try {
621                        socket = socketFuture.getOrThrow();
622                    } catch (IOException e) {
623                        hostAddress.setException(inetAddress, e);
624                        if (inetAddresses.hasNext()) {
625                            continue innerloop;
626                        } else {
627                            break innerloop;
628                        }
629                    }
630                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
631                    // We found a host to connect to, return here
632                    this.host = host;
633                    this.port = port;
634                    return;
635                }
636                failedAddresses.add(hostAddress);
637            } else {
638                socket = socketFactory.createSocket();
639                StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy");
640                final String hostAndPort = host + " at port " + port;
641                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
642                try {
643                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
644                } catch (IOException e) {
645                    hostAddress.setException(e);
646                    continue;
647                }
648                LOGGER.finer("Established TCP connection to " + hostAndPort);
649                // We found a host to connect to, return here
650                this.host = host;
651                this.port = port;
652                return;
653            }
654        }
655        // There are no more host addresses to try
656        // throw an exception and report all tried
657        // HostAddresses in the exception
658        throw ConnectionException.from(failedAddresses);
659    }
660
661    /**
662     * Initializes the connection by creating a stanza reader and writer and opening a
663     * XMPP stream to the server.
664     *
665     * @throws XMPPException if establishing a connection to the server fails.
666     * @throws SmackException if the server fails to respond back or if there is anther error.
667     * @throws IOException
668     * @throws InterruptedException
669     */
670    private void initConnection() throws IOException, InterruptedException {
671        compressionHandler = null;
672
673        // Set the reader and writer instance variables
674        initReaderAndWriter();
675
676        int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits();
677        if (availableReaderWriterSemaphorePermits < 2) {
678            Object[] logObjects = new Object[] {
679                            this,
680                            availableReaderWriterSemaphorePermits,
681            };
682            LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects);
683        }
684        readerWriterSemaphore.acquire(2);
685        // Start the writer thread. This will open an XMPP stream to the server
686        packetWriter.init();
687        // Start the reader thread. The startup() method will block until we
688        // get an opening stream packet back from server
689        packetReader.init();
690    }
691
692    private void initReaderAndWriter() throws IOException {
693        InputStream is = socket.getInputStream();
694        OutputStream os = socket.getOutputStream();
695        if (compressionHandler != null) {
696            is = compressionHandler.getInputStream(is);
697            os = compressionHandler.getOutputStream(os);
698        }
699        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
700        writer = new OutputStreamWriter(os, "UTF-8");
701        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
702
703        // If debugging is enabled, we open a window and write out all network traffic.
704        initDebugger();
705    }
706
707    /**
708     * The server has indicated that TLS negotiation can start. We now need to secure the
709     * existing plain connection and perform a handshake. This method won't return until the
710     * connection has finished the handshake or an error occurred while securing the connection.
711     * @throws IOException
712     * @throws CertificateException
713     * @throws NoSuchAlgorithmException
714     * @throws NoSuchProviderException
715     * @throws KeyStoreException
716     * @throws UnrecoverableKeyException
717     * @throws KeyManagementException
718     * @throws SmackException
719     * @throws Exception if an exception occurs.
720     */
721    @SuppressWarnings("LiteralClassName")
722    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
723        SmackDaneVerifier daneVerifier = null;
724
725        if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) {
726            SmackDaneProvider daneProvider = DNSUtil.getDaneProvider();
727            if (daneProvider == null) {
728                throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured");
729            }
730            daneVerifier = daneProvider.newInstance();
731            if (daneVerifier == null) {
732                throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier");
733            }
734        }
735
736        SSLContext context = this.config.getCustomSSLContext();
737        KeyStore ks = null;
738        PasswordCallback pcb = null;
739
740        if (context == null) {
741            final String keyStoreType = config.getKeystoreType();
742            final CallbackHandler callbackHandler = config.getCallbackHandler();
743            final String keystorePath = config.getKeystorePath();
744            if ("PKCS11".equals(keyStoreType)) {
745                try {
746                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
747                    String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library();
748                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8));
749                    Provider p = (Provider) c.newInstance(config);
750                    Security.addProvider(p);
751                    ks = KeyStore.getInstance("PKCS11",p);
752                    pcb = new PasswordCallback("PKCS11 Password: ",false);
753                    callbackHandler.handle(new Callback[] {pcb});
754                    ks.load(null,pcb.getPassword());
755                }
756                catch (Exception e) {
757                    LOGGER.log(Level.WARNING, "Exception", e);
758                    ks = null;
759                }
760            }
761            else if ("Apple".equals(keyStoreType)) {
762                ks = KeyStore.getInstance("KeychainStore","Apple");
763                ks.load(null,null);
764                // pcb = new PasswordCallback("Apple Keychain",false);
765                // pcb.setPassword(null);
766            }
767            else if (keyStoreType != null) {
768                ks = KeyStore.getInstance(keyStoreType);
769                if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) {
770                    try {
771                        pcb = new PasswordCallback("Keystore Password: ", false);
772                        callbackHandler.handle(new Callback[] { pcb });
773                        ks.load(new FileInputStream(keystorePath), pcb.getPassword());
774                    }
775                    catch (Exception e) {
776                        LOGGER.log(Level.WARNING, "Exception", e);
777                        ks = null;
778                    }
779                } else {
780                    ks.load(null, null);
781                }
782            }
783
784            KeyManager[] kms = null;
785
786            if (ks != null) {
787                String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
788                KeyManagerFactory kmf = null;
789                try {
790                    kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm);
791                }
792                catch (NoSuchAlgorithmException e) {
793                    LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '"
794                                    + keyManagerFactoryAlgorithm + "' algorithm", e);
795                }
796                if (kmf != null) {
797                    try {
798                        if (pcb == null) {
799                            kmf.init(ks, null);
800                        }
801                        else {
802                            kmf.init(ks, pcb.getPassword());
803                            pcb.clearPassword();
804                        }
805                        kms = kmf.getKeyManagers();
806                    }
807                    catch (NullPointerException npe) {
808                        LOGGER.log(Level.WARNING, "NullPointerException", npe);
809                    }
810                }
811            }
812
813            // If the user didn't specify a SSLContext, use the default one
814            context = SSLContext.getInstance("TLS");
815
816            final SecureRandom secureRandom = new java.security.SecureRandom();
817            X509TrustManager customTrustManager = config.getCustomX509TrustManager();
818
819            if (daneVerifier != null) {
820                // User requested DANE verification.
821                daneVerifier.init(context, kms, customTrustManager, secureRandom);
822            } else {
823                TrustManager[] customTrustManagers = null;
824                if (customTrustManager != null) {
825                    customTrustManagers = new TrustManager[] { customTrustManager };
826                }
827                context.init(kms, customTrustManagers, secureRandom);
828            }
829        }
830
831        Socket plain = socket;
832        // Secure the plain connection
833        socket = context.getSocketFactory().createSocket(plain,
834                config.getXMPPServiceDomain().toString(), plain.getPort(), true);
835
836        final SSLSocket sslSocket = (SSLSocket) socket;
837        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
838        // important (at least on certain platforms) and it seems to be a good idea anyways to
839        // prevent an accidental implicit handshake.
840        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
841
842        // Initialize the reader and writer with the new secured version
843        initReaderAndWriter();
844
845        // Proceed to do the handshake
846        sslSocket.startHandshake();
847
848        if (daneVerifier != null) {
849            daneVerifier.finish(sslSocket);
850        }
851
852        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
853        if (verifier == null) {
854                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
855        } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) {
856            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain());
857        }
858
859        // Set that TLS was successful
860        secureSocket = sslSocket;
861    }
862
863    /**
864     * Returns the compression handler that can be used for one compression methods offered by the server.
865     *
866     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
867     *
868     */
869    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
870        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
871                String method = handler.getCompressionMethod();
872                if (compression.getMethods().contains(method))
873                    return handler;
874        }
875        return null;
876    }
877
878    @Override
879    public boolean isUsingCompression() {
880        return compressionHandler != null && compressSyncPoint.wasSuccessful();
881    }
882
883    /**
884     * <p>
885     * Starts using stream compression that will compress network traffic. Traffic can be
886     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
887     * connection. However, the server and the client will need to use more CPU time in order to
888     * un/compress network data so under high load the server performance might be affected.
889     * </p>
890     * <p>
891     * Stream compression has to have been previously offered by the server. Currently only the
892     * zlib method is supported by the client. Stream compression negotiation has to be done
893     * before authentication took place.
894     * </p>
895     *
896     * @throws NotConnectedException
897     * @throws SmackException
898     * @throws NoResponseException
899     * @throws InterruptedException
900     */
901    private void maybeEnableCompression() throws SmackException, InterruptedException {
902        if (!config.isCompressionEnabled()) {
903            return;
904        }
905
906        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
907        if (compression == null) {
908            // Server does not support compression
909            return;
910        }
911        // If stream compression was offered by the server and we want to use
912        // compression then send compression request to the server
913        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
914            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
915        } else {
916            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
917        }
918    }
919
920    /**
921     * Establishes a connection to the XMPP server. It basically
922     * creates and maintains a socket connection to the server.
923     * <p>
924     * Listeners will be preserved from a previous connection if the reconnection
925     * occurs after an abrupt termination.
926     * </p>
927     *
928     * @throws XMPPException if an error occurs while trying to establish the connection.
929     * @throws SmackException
930     * @throws IOException
931     * @throws InterruptedException
932     */
933    @Override
934    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
935        closingStreamReceived.init();
936        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
937        // there is an error establishing the connection
938        connectUsingConfiguration();
939
940        // We connected successfully to the servers TCP port
941        initConnection();
942
943        // TLS handled will be successful either if TLS was established, or if it was not mandatory.
944        tlsHandled.checkIfSuccessOrWaitOrThrow();
945
946        // Wait with SASL auth until the SASL mechanisms have been received
947        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
948    }
949
950    /**
951     * Sends out a notification that there was an error with the connection
952     * and closes the connection. Also prints the stack trace of the given exception
953     *
954     * @param e the exception that causes the connection close event.
955     */
956    private synchronized void notifyConnectionError(final Exception e) {
957        ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() {
958            @Override
959            public void run() {
960                // Listeners were already notified of the exception, return right here.
961                if (packetReader.done || packetWriter.done()) return;
962
963                // Report the failure outside the synchronized block, so that a thread waiting within a synchronized
964                // function like connect() throws the wrapped exception.
965                SmackWrappedException smackWrappedException = new SmackWrappedException(e);
966                tlsHandled.reportGenericFailure(smackWrappedException);
967                saslFeatureReceived.reportGenericFailure(smackWrappedException);
968                maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException);
969                lastFeaturesReceived.reportGenericFailure(smackWrappedException);
970
971                synchronized (XMPPTCPConnection.this) {
972                    // Within this synchronized block, either *both* reader and writer threads must be terminated, or
973                    // none.
974                    assert ((packetReader.done && packetWriter.done())
975                            || (!packetReader.done && !packetWriter.done()));
976
977                    // Closes the connection temporary. A reconnection is possible
978                    // Note that a connection listener of XMPPTCPConnection will drop the SM state in
979                    // case the Exception is a StreamErrorException.
980                    instantShutdown();
981
982                    // Wait for reader and writer threads to be terminated.
983                    readerWriterSemaphore.acquireUninterruptibly(2);
984                    readerWriterSemaphore.release(2);
985                }
986
987                Async.go(new Runnable() {
988                    @Override
989                    public void run() {
990                        // Notify connection listeners of the error.
991                        callConnectionClosedOnErrorListener(e);
992                    }
993                }, XMPPTCPConnection.this + " callConnectionClosedOnErrorListener()");
994            }
995        });
996    }
997
998    /**
999     * For unit testing purposes
1000     *
1001     * @param writer
1002     */
1003    protected void setWriter(Writer writer) {
1004        this.writer = writer;
1005    }
1006
1007    @Override
1008    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException {
1009        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
1010        if (startTlsFeature != null) {
1011            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
1012                SecurityRequiredByServerException smackException = new SecurityRequiredByServerException();
1013                tlsHandled.reportFailure(smackException);
1014                throw smackException;
1015            }
1016
1017            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
1018                sendNonza(new StartTls());
1019            } else {
1020                tlsHandled.reportSuccess();
1021            }
1022        } else {
1023            tlsHandled.reportSuccess();
1024        }
1025
1026        if (getSASLAuthentication().authenticationSuccessful()) {
1027            // If we have received features after the SASL has been successfully completed, then we
1028            // have also *maybe* received, as it is an optional feature, the compression feature
1029            // from the server.
1030            maybeCompressFeaturesReceived.reportSuccess();
1031        }
1032    }
1033
1034    /**
1035     * Resets the parser using the latest connection's reader. Resetting the parser is necessary
1036     * when the plain connection has been secured or when a new opening stream element is going
1037     * to be sent by the server.
1038     *
1039     * @throws SmackException if the parser could not be reset.
1040     * @throws InterruptedException
1041     */
1042    void openStream() throws SmackException, InterruptedException {
1043        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
1044        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
1045        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
1046        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
1047        CharSequence to = getXMPPServiceDomain();
1048        CharSequence from = null;
1049        CharSequence localpart = config.getUsername();
1050        if (localpart != null) {
1051            from = XmppStringUtils.completeJidFrom(localpart, to);
1052        }
1053        String id = getStreamId();
1054        sendNonza(new StreamOpen(to, from, id));
1055        try {
1056            packetReader.parser = PacketParserUtils.newXmppParser(reader);
1057        }
1058        catch (XmlPullParserException e) {
1059            throw new SmackException(e);
1060        }
1061    }
1062
1063    protected class PacketReader {
1064
1065        XmlPullParser parser;
1066
1067        private volatile boolean done;
1068
1069        /**
1070         * Initializes the reader in order to be used. The reader is initialized during the
1071         * first connection and when reconnecting due to an abruptly disconnection.
1072         */
1073        void init() {
1074            done = false;
1075
1076            Async.go(new Runnable() {
1077                @Override
1078                public void run() {
1079                    try {
1080                        parsePackets();
1081                    } finally {
1082                        XMPPTCPConnection.this.readerWriterSemaphore.release();
1083                    }
1084                }
1085            }, "Smack Reader (" + getConnectionCounter() + ")");
1086         }
1087
1088        /**
1089         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
1090         */
1091        void shutdown() {
1092            done = true;
1093        }
1094
1095        /**
1096         * Parse top-level packets in order to process them further.
1097         */
1098        private void parsePackets() {
1099            try {
1100                initialOpenStreamSend.checkIfSuccessOrWait();
1101                int eventType = parser.getEventType();
1102                while (!done) {
1103                    switch (eventType) {
1104                    case XmlPullParser.START_TAG:
1105                        final String name = parser.getName();
1106                        switch (name) {
1107                        case Message.ELEMENT:
1108                        case IQ.IQ_ELEMENT:
1109                        case Presence.ELEMENT:
1110                            try {
1111                                parseAndProcessStanza(parser);
1112                            } finally {
1113                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1114                            }
1115                            break;
1116                        case "stream":
1117                            // We found an opening stream.
1118                            if ("jabber:client".equals(parser.getNamespace(null))) {
1119                                streamId = parser.getAttributeValue("", "id");
1120                                String reportedServerDomain = parser.getAttributeValue("", "from");
1121                                assert (config.getXMPPServiceDomain().equals(reportedServerDomain));
1122                            }
1123                            break;
1124                        case "error":
1125                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
1126                            saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
1127                            // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
1128                            // point to report the error, which is checked immediately after tlsHandled in
1129                            // connectInternal().
1130                            tlsHandled.reportSuccess();
1131                            throw new StreamErrorException(streamError);
1132                        case "features":
1133                            parseFeatures(parser);
1134                            break;
1135                        case "proceed":
1136                            try {
1137                                // Secure the connection by negotiating TLS
1138                                proceedTLSReceived();
1139                                // Send a new opening stream to the server
1140                                openStream();
1141                            }
1142                            catch (Exception e) {
1143                                SmackException smackException = new SmackException(e);
1144                                tlsHandled.reportFailure(smackException);
1145                                throw e;
1146                            }
1147                            break;
1148                        case "failure":
1149                            String namespace = parser.getNamespace(null);
1150                            switch (namespace) {
1151                            case "urn:ietf:params:xml:ns:xmpp-tls":
1152                                // TLS negotiation has failed. The server will close the connection
1153                                // TODO Parse failure stanza
1154                                throw new SmackException("TLS negotiation has failed");
1155                            case "http://jabber.org/protocol/compress":
1156                                // Stream compression has been denied. This is a recoverable
1157                                // situation. It is still possible to authenticate and
1158                                // use the connection but using an uncompressed connection
1159                                // TODO Parse failure stanza
1160                                compressSyncPoint.reportFailure(new SmackException(
1161                                                "Could not establish compression"));
1162                                break;
1163                            case SaslStreamElements.NAMESPACE:
1164                                // SASL authentication has failed. The server may close the connection
1165                                // depending on the number of retries
1166                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1167                                getSASLAuthentication().authenticationFailed(failure);
1168                                break;
1169                            }
1170                            break;
1171                        case Challenge.ELEMENT:
1172                            // The server is challenging the SASL authentication made by the client
1173                            String challengeData = parser.nextText();
1174                            getSASLAuthentication().challengeReceived(challengeData);
1175                            break;
1176                        case Success.ELEMENT:
1177                            Success success = new Success(parser.nextText());
1178                            // We now need to bind a resource for the connection
1179                            // Open a new stream and wait for the response
1180                            openStream();
1181                            // The SASL authentication with the server was successful. The next step
1182                            // will be to bind the resource
1183                            getSASLAuthentication().authenticated(success);
1184                            break;
1185                        case Compressed.ELEMENT:
1186                            // Server confirmed that it's possible to use stream compression. Start
1187                            // stream compression
1188                            // Initialize the reader and writer with the new compressed version
1189                            initReaderAndWriter();
1190                            // Send a new opening stream to the server
1191                            openStream();
1192                            // Notify that compression is being used
1193                            compressSyncPoint.reportSuccess();
1194                            break;
1195                        case Enabled.ELEMENT:
1196                            Enabled enabled = ParseStreamManagement.enabled(parser);
1197                            if (enabled.isResumeSet()) {
1198                                smSessionId = enabled.getId();
1199                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1200                                    SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received");
1201                                    smEnabledSyncPoint.reportFailure(xmppException);
1202                                    throw xmppException;
1203                                }
1204                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1205                            } else {
1206                                // Mark this a non-resumable stream by setting smSessionId to null
1207                                smSessionId = null;
1208                            }
1209                            clientHandledStanzasCount = 0;
1210                            smWasEnabledAtLeastOnce = true;
1211                            smEnabledSyncPoint.reportSuccess();
1212                            LOGGER.fine("Stream Management (XEP-198): successfully enabled");
1213                            break;
1214                        case Failed.ELEMENT:
1215                            Failed failed = ParseStreamManagement.failed(parser);
1216                            FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1217                            // If only XEP-198 would specify different failure elements for the SM
1218                            // enable and SM resume failure case. But this is not the case, so we
1219                            // need to determine if this is a 'Failed' response for either 'Enable'
1220                            // or 'Resume'.
1221                            if (smResumedSyncPoint.requestSent()) {
1222                                smResumedSyncPoint.reportFailure(xmppException);
1223                            }
1224                            else {
1225                                if (!smEnabledSyncPoint.requestSent()) {
1226                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1227                                }
1228                                smEnabledSyncPoint.reportFailure(new SmackException(xmppException));
1229                                // Report success for last lastFeaturesReceived so that in case a
1230                                // failed resumption, we can continue with normal resource binding.
1231                                // See text of XEP-198 5. below Example 11.
1232                                lastFeaturesReceived.reportSuccess();
1233                            }
1234                            break;
1235                        case Resumed.ELEMENT:
1236                            Resumed resumed = ParseStreamManagement.resumed(parser);
1237                            if (!smSessionId.equals(resumed.getPrevId())) {
1238                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1239                            }
1240                            // Mark SM as enabled
1241                            smEnabledSyncPoint.reportSuccess();
1242                            // First, drop the stanzas already handled by the server
1243                            processHandledCount(resumed.getHandledCount());
1244                            // Then re-send what is left in the unacknowledged queue
1245                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1246                            unacknowledgedStanzas.drainTo(stanzasToResend);
1247                            for (Stanza stanza : stanzasToResend) {
1248                                sendStanzaInternal(stanza);
1249                            }
1250                            // If there where stanzas resent, then request a SM ack for them.
1251                            // Writer's sendStreamElement() won't do it automatically based on
1252                            // predicates.
1253                            if (!stanzasToResend.isEmpty()) {
1254                                requestSmAcknowledgementInternal();
1255                            }
1256                            // Mark SM resumption as successful
1257                            smResumedSyncPoint.reportSuccess();
1258                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1259                            break;
1260                        case AckAnswer.ELEMENT:
1261                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1262                            processHandledCount(ackAnswer.getHandledCount());
1263                            break;
1264                        case AckRequest.ELEMENT:
1265                            ParseStreamManagement.ackRequest(parser);
1266                            if (smEnabledSyncPoint.wasSuccessful()) {
1267                                sendSmAcknowledgementInternal();
1268                            } else {
1269                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1270                            }
1271                            break;
1272                         default:
1273                             LOGGER.warning("Unknown top level stream element: " + name);
1274                             break;
1275                        }
1276                        break;
1277                    case XmlPullParser.END_TAG:
1278                        final String endTagName = parser.getName();
1279                        if ("stream".equals(endTagName)) {
1280                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1281                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1282                                break;
1283                            }
1284
1285                            // Check if the queue was already shut down before reporting success on closing stream tag
1286                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1287                            // did re-start the queue again, causing this writer to assume that the queue is not
1288                            // shutdown, which results in a call to disconnect().
1289                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1290                            closingStreamReceived.reportSuccess();
1291
1292                            if (queueWasShutdown) {
1293                                // We received a closing stream element *after* we initiated the
1294                                // termination of the session by sending a closing stream element to
1295                                // the server first
1296                                return;
1297                            } else {
1298                                // We received a closing stream element from the server without us
1299                                // sending a closing stream element first. This means that the
1300                                // server wants to terminate the session, therefore disconnect
1301                                // the connection
1302                                LOGGER.info(XMPPTCPConnection.this
1303                                                + " received closing </stream> element."
1304                                                + " Server wants to terminate the connection, calling disconnect()");
1305                                ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() {
1306                                    @Override
1307                                    public void run() {
1308                                        disconnect();
1309                                    }});
1310                            }
1311                        }
1312                        break;
1313                    case XmlPullParser.END_DOCUMENT:
1314                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1315                        // closing stream element before.
1316                        throw new SmackException(
1317                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1318                    }
1319                    eventType = parser.next();
1320                }
1321            }
1322            catch (Exception e) {
1323                closingStreamReceived.reportFailure(e);
1324                // The exception can be ignored if the the connection is 'done'
1325                // or if the it was caused because the socket got closed
1326                if (!(done || packetWriter.queue.isShutdown())) {
1327                    // Close the connection and notify connection listeners of the
1328                    // error.
1329                    notifyConnectionError(e);
1330                }
1331            }
1332        }
1333    }
1334
1335    protected class PacketWriter {
1336        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1337
1338        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1339                        QUEUE_SIZE, true);
1340
1341        /**
1342         * Needs to be protected for unit testing purposes.
1343         */
1344        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>(
1345                        XMPPTCPConnection.this, "shutdown completed");
1346
1347        /**
1348         * If set, the stanza writer is shut down
1349         */
1350        protected volatile Long shutdownTimestamp = null;
1351
1352        private volatile boolean instantShutdown;
1353
1354        /**
1355         * True if some preconditions are given to start the bundle and defer mechanism.
1356         * <p>
1357         * This will likely get set to true right after the start of the writer thread, because
1358         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1359         * this field to true.
1360         * </p>
1361         */
1362        private boolean shouldBundleAndDefer;
1363
1364        /**
1365        * Initializes the writer in order to be used. It is called at the first connection and also
1366        * is invoked if the connection is disconnected by an error.
1367        */
1368        void init() {
1369            shutdownDone.init();
1370            shutdownTimestamp = null;
1371
1372            if (unacknowledgedStanzas != null) {
1373                // It's possible that there are new stanzas in the writer queue that
1374                // came in while we were disconnected but resumable, drain those into
1375                // the unacknowledged queue so that they get resent now
1376                drainWriterQueueToUnacknowledgedStanzas();
1377            }
1378
1379            queue.start();
1380            Async.go(new Runnable() {
1381                @Override
1382                public void run() {
1383                    try {
1384                        writePackets();
1385                    } finally {
1386                        XMPPTCPConnection.this.readerWriterSemaphore.release();
1387                    }
1388                }
1389            }, "Smack Writer (" + getConnectionCounter() + ")");
1390        }
1391
1392        private boolean done() {
1393            return shutdownTimestamp != null;
1394        }
1395
1396        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1397            final boolean done = done();
1398            if (done) {
1399                final boolean smResumptionPossible = isSmResumptionPossible();
1400                // Don't throw a NotConnectedException is there is an resumable stream available
1401                if (!smResumptionPossible) {
1402                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1403                                    + " smResumptionPossible=" + smResumptionPossible);
1404                }
1405            }
1406        }
1407
1408        /**
1409         * Sends the specified element to the server.
1410         *
1411         * @param element the element to send.
1412         * @throws NotConnectedException
1413         * @throws InterruptedException
1414         */
1415        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1416            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1417            try {
1418                queue.put(element);
1419            }
1420            catch (InterruptedException e) {
1421                // put() may throw an InterruptedException for two reasons:
1422                // 1. If the queue was shut down
1423                // 2. If the thread was interrupted
1424                // so we have to check which is the case
1425                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1426                // If the method above did not throw, then the sending thread was interrupted
1427                throw e;
1428            }
1429        }
1430
1431        /**
1432         * Shuts down the stanza writer. Once this method has been called, no further
1433         * packets will be written to the server.
1434         * @throws InterruptedException
1435         */
1436        void shutdown(boolean instant) {
1437            instantShutdown = instant;
1438            queue.shutdown();
1439            shutdownTimestamp = System.currentTimeMillis();
1440            if (shutdownDone.isNotInInitialState()) {
1441                try {
1442                    shutdownDone.checkIfSuccessOrWait();
1443                } catch (NoResponseException | InterruptedException e) {
1444                    LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1445                }
1446            }
1447        }
1448
1449        /**
1450         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1451         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1452         * that case.
1453         *
1454         * @return the next element for writing or null.
1455         */
1456        private Element nextStreamElement() {
1457            // It is important the we check if the queue is empty before removing an element from it
1458            if (queue.isEmpty()) {
1459                shouldBundleAndDefer = true;
1460            }
1461            Element packet = null;
1462            try {
1463                packet = queue.take();
1464            }
1465            catch (InterruptedException e) {
1466                if (!queue.isShutdown()) {
1467                    // Users shouldn't try to interrupt the packet writer thread
1468                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1469                }
1470            }
1471            return packet;
1472        }
1473
1474        private void writePackets() {
1475            Exception writerException = null;
1476            try {
1477                openStream();
1478                initialOpenStreamSend.reportSuccess();
1479                // Write out packets from the queue.
1480                while (!done()) {
1481                    Element element = nextStreamElement();
1482                    if (element == null) {
1483                        continue;
1484                    }
1485
1486                    // Get a local version of the bundle and defer callback, in case it's unset
1487                    // between the null check and the method invocation
1488                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1489                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1490                    // empty), then we could wait a bit for further stanzas attempting to decrease
1491                    // our energy consumption
1492                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1493                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1494                        // queue is empty again.
1495                        shouldBundleAndDefer = false;
1496                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1497                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1498                                        bundlingAndDeferringStopped));
1499                        if (bundleAndDeferMillis > 0) {
1500                            long remainingWait = bundleAndDeferMillis;
1501                            final long waitStart = System.currentTimeMillis();
1502                            synchronized (bundlingAndDeferringStopped) {
1503                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1504                                    bundlingAndDeferringStopped.wait(remainingWait);
1505                                    remainingWait = bundleAndDeferMillis
1506                                                    - (System.currentTimeMillis() - waitStart);
1507                                }
1508                            }
1509                        }
1510                    }
1511
1512                    Stanza packet = null;
1513                    if (element instanceof Stanza) {
1514                        packet = (Stanza) element;
1515                    }
1516                    else if (element instanceof Enable) {
1517                        // The client needs to add messages to the unacknowledged stanzas queue
1518                        // right after it sent 'enabled'. Stanza will be added once
1519                        // unacknowledgedStanzas is not null.
1520                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1521                    }
1522                    maybeAddToUnacknowledgedStanzas(packet);
1523
1524                    CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE);
1525                    if (elementXml instanceof XmlStringBuilder) {
1526                        ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE);
1527                    }
1528                    else {
1529                        writer.write(elementXml.toString());
1530                    }
1531
1532                    if (queue.isEmpty()) {
1533                        writer.flush();
1534                    }
1535                    if (packet != null) {
1536                        firePacketSendingListeners(packet);
1537                    }
1538                }
1539                if (!instantShutdown) {
1540                    // Flush out the rest of the queue.
1541                    try {
1542                        while (!queue.isEmpty()) {
1543                            Element packet = queue.remove();
1544                            if (packet instanceof Stanza) {
1545                                Stanza stanza = (Stanza) packet;
1546                                maybeAddToUnacknowledgedStanzas(stanza);
1547                            }
1548                            writer.write(packet.toXML(null).toString());
1549                        }
1550                        writer.flush();
1551                    }
1552                    catch (Exception e) {
1553                        LOGGER.log(Level.WARNING,
1554                                        "Exception flushing queue during shutdown, ignore and continue",
1555                                        e);
1556                    }
1557
1558                    // Close the stream.
1559                    try {
1560                        writer.write("</stream:stream>");
1561                        writer.flush();
1562                    }
1563                    catch (Exception e) {
1564                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1565                    }
1566
1567                    // Delete the queue contents (hopefully nothing is left).
1568                    queue.clear();
1569                } else if (instantShutdown && isSmEnabled()) {
1570                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1571                    // into the unacknowledgedStanzas queue
1572                    drainWriterQueueToUnacknowledgedStanzas();
1573                }
1574                // Do *not* close the writer here, as it will cause the socket
1575                // to get closed. But we may want to receive further stanzas
1576                // until the closing stream tag is received. The socket will be
1577                // closed in shutdown().
1578            }
1579            catch (Exception e) {
1580                // The exception can be ignored if the the connection is 'done'
1581                // or if the it was caused because the socket got closed
1582                if (!(done() || queue.isShutdown())) {
1583                    writerException = e;
1584                } else {
1585                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1586                }
1587            } finally {
1588                LOGGER.fine("Reporting shutdownDone success in writer thread");
1589                shutdownDone.reportSuccess();
1590            }
1591            // Delay notifyConnectionError after shutdownDone has been reported in the finally block.
1592            if (writerException != null) {
1593                notifyConnectionError(writerException);
1594            }
1595        }
1596
1597        private void drainWriterQueueToUnacknowledgedStanzas() {
1598            List<Element> elements = new ArrayList<>(queue.size());
1599            queue.drainTo(elements);
1600            for (int i = 0; i < elements.size(); i++) {
1601                Element element = elements.get(i);
1602                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1603                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1604                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1605                            .newWith(i, elements, unacknowledgedStanzas);
1606                    LOGGER.log(Level.WARNING,
1607                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1608                    return;
1609                }
1610                if (element instanceof Stanza) {
1611                    unacknowledgedStanzas.add((Stanza) element);
1612                }
1613            }
1614        }
1615
1616        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1617            // Check if the stream element should be put to the unacknowledgedStanza
1618            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1619            // packet order is not stable at this point (sendStanzaInternal() can be
1620            // called concurrently).
1621            if (unacknowledgedStanzas != null && stanza != null) {
1622                // If the unacknowledgedStanza queue is nearly full, request an new ack
1623                // from the server in order to drain it
1624                if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1625                    writer.write(AckRequest.INSTANCE.toXML(null).toString());
1626                    writer.flush();
1627                }
1628                try {
1629                    // It is important the we put the stanza in the unacknowledged stanza
1630                    // queue before we put it on the wire
1631                    unacknowledgedStanzas.put(stanza);
1632                }
1633                catch (InterruptedException e) {
1634                    throw new IllegalStateException(e);
1635                }
1636            }
1637        }
1638    }
1639
1640    /**
1641     * Set if Stream Management should be used by default for new connections.
1642     *
1643     * @param useSmDefault true to use Stream Management for new connections.
1644     */
1645    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1646        XMPPTCPConnection.useSmDefault = useSmDefault;
1647    }
1648
1649    /**
1650     * Set if Stream Management resumption should be used by default for new connections.
1651     *
1652     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1653     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1654     */
1655    @Deprecated
1656    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1657        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1658    }
1659
1660    /**
1661     * Set if Stream Management resumption should be used by default for new connections.
1662     *
1663     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1664     */
1665    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1666        if (useSmResumptionDefault) {
1667            // Also enable SM is resumption is enabled
1668            setUseStreamManagementDefault(useSmResumptionDefault);
1669        }
1670        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1671    }
1672
1673    /**
1674     * Set if Stream Management should be used if supported by the server.
1675     *
1676     * @param useSm true to use Stream Management.
1677     */
1678    public void setUseStreamManagement(boolean useSm) {
1679        this.useSm = useSm;
1680    }
1681
1682    /**
1683     * Set if Stream Management resumption should be used if supported by the server.
1684     *
1685     * @param useSmResumption true to use Stream Management resumption.
1686     */
1687    public void setUseStreamManagementResumption(boolean useSmResumption) {
1688        if (useSmResumption) {
1689            // Also enable SM is resumption is enabled
1690            setUseStreamManagement(useSmResumption);
1691        }
1692        this.useSmResumption = useSmResumption;
1693    }
1694
1695    /**
1696     * Set the preferred resumption time in seconds.
1697     * @param resumptionTime the preferred resumption time in seconds
1698     */
1699    public void setPreferredResumptionTime(int resumptionTime) {
1700        smClientMaxResumptionTime = resumptionTime;
1701    }
1702
1703    /**
1704     * Add a predicate for Stream Management acknowledgment requests.
1705     * <p>
1706     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1707     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1708     * </p>
1709     * <p>
1710     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1711     * </p>
1712     *
1713     * @param predicate the predicate to add.
1714     * @return if the predicate was not already active.
1715     */
1716    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1717        synchronized (requestAckPredicates) {
1718            return requestAckPredicates.add(predicate);
1719        }
1720    }
1721
1722    /**
1723     * Remove the given predicate for Stream Management acknowledgment request.
1724     * @param predicate the predicate to remove.
1725     * @return true if the predicate was removed.
1726     */
1727    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1728        synchronized (requestAckPredicates) {
1729            return requestAckPredicates.remove(predicate);
1730        }
1731    }
1732
1733    /**
1734     * Remove all predicates for Stream Management acknowledgment requests.
1735     */
1736    public void removeAllRequestAckPredicates() {
1737        synchronized (requestAckPredicates) {
1738            requestAckPredicates.clear();
1739        }
1740    }
1741
1742    /**
1743     * Send an unconditional Stream Management acknowledgement request to the server.
1744     *
1745     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1746     * @throws NotConnectedException if the connection is not connected.
1747     * @throws InterruptedException
1748     */
1749    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1750        if (!isSmEnabled()) {
1751            throw new StreamManagementException.StreamManagementNotEnabledException();
1752        }
1753        requestSmAcknowledgementInternal();
1754    }
1755
1756    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1757        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1758    }
1759
1760    /**
1761     * Send a unconditional Stream Management acknowledgment to the server.
1762     * <p>
1763     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1764     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1765     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1766     * </p>
1767     *
1768     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1769     * @throws NotConnectedException if the connection is not connected.
1770     * @throws InterruptedException
1771     */
1772    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1773        if (!isSmEnabled()) {
1774            throw new StreamManagementException.StreamManagementNotEnabledException();
1775        }
1776        sendSmAcknowledgementInternal();
1777    }
1778
1779    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1780        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1781    }
1782
1783    /**
1784     * Add a Stanza acknowledged listener.
1785     * <p>
1786     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1787     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1788     * possible.
1789     * </p>
1790     *
1791     * @param listener the listener to add.
1792     */
1793    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1794        stanzaAcknowledgedListeners.add(listener);
1795    }
1796
1797    /**
1798     * Remove the given Stanza acknowledged listener.
1799     *
1800     * @param listener the listener.
1801     * @return true if the listener was removed.
1802     */
1803    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1804        return stanzaAcknowledgedListeners.remove(listener);
1805    }
1806
1807    /**
1808     * Remove all stanza acknowledged listeners.
1809     */
1810    public void removeAllStanzaAcknowledgedListeners() {
1811        stanzaAcknowledgedListeners.clear();
1812    }
1813
1814    /**
1815     * Add a Stanza dropped listener.
1816     * <p>
1817     * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get
1818     * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit
1819     * the Stanzas.
1820     * </p>
1821     *
1822     * @param listener the listener to add.
1823     * @since 4.3.3
1824     */
1825    public void addStanzaDroppedListener(StanzaListener listener) {
1826        stanzaDroppedListeners.add(listener);
1827    }
1828
1829    /**
1830     * Remove the given Stanza dropped listener.
1831     *
1832     * @param listener the listener.
1833     * @return true if the listener was removed.
1834     * @since 4.3.3
1835     */
1836    public boolean removeStanzaDroppedListener(StanzaListener listener) {
1837        return stanzaDroppedListeners.remove(listener);
1838    }
1839
1840    /**
1841     * Add a new Stanza ID acknowledged listener for the given ID.
1842     * <p>
1843     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1844     * automatically be removed after the listener was run.
1845     * </p>
1846     *
1847     * @param id the stanza ID.
1848     * @param listener the listener to invoke.
1849     * @return the previous listener for this stanza ID or null.
1850     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1851     */
1852    @SuppressWarnings("FutureReturnValueIgnored")
1853    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1854        // Prevent users from adding callbacks that will never get removed
1855        if (!smWasEnabledAtLeastOnce) {
1856            throw new StreamManagementException.StreamManagementNotEnabledException();
1857        }
1858        // Remove the listener after max. 3 hours
1859        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1860        schedule(new Runnable() {
1861            @Override
1862            public void run() {
1863                stanzaIdAcknowledgedListeners.remove(id);
1864            }
1865        }, removeAfterSeconds, TimeUnit.SECONDS);
1866        return stanzaIdAcknowledgedListeners.put(id, listener);
1867    }
1868
1869    /**
1870     * Remove the Stanza ID acknowledged listener for the given ID.
1871     *
1872     * @param id the stanza ID.
1873     * @return true if the listener was found and removed, false otherwise.
1874     */
1875    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1876        return stanzaIdAcknowledgedListeners.remove(id);
1877    }
1878
1879    /**
1880     * Removes all Stanza ID acknowledged listeners.
1881     */
1882    public void removeAllStanzaIdAcknowledgedListeners() {
1883        stanzaIdAcknowledgedListeners.clear();
1884    }
1885
1886    /**
1887     * Returns true if Stream Management is supported by the server.
1888     *
1889     * @return true if Stream Management is supported by the server.
1890     */
1891    public boolean isSmAvailable() {
1892        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1893    }
1894
1895    /**
1896     * Returns true if Stream Management was successfully negotiated with the server.
1897     *
1898     * @return true if Stream Management was negotiated.
1899     */
1900    public boolean isSmEnabled() {
1901        return smEnabledSyncPoint.wasSuccessful();
1902    }
1903
1904    /**
1905     * Returns true if the stream was successfully resumed with help of Stream Management.
1906     *
1907     * @return true if the stream was resumed.
1908     */
1909    public boolean streamWasResumed() {
1910        return smResumedSyncPoint.wasSuccessful();
1911    }
1912
1913    /**
1914     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1915     *
1916     * @return true if disconnected but resumption possible.
1917     */
1918    public boolean isDisconnectedButSmResumptionPossible() {
1919        return disconnectedButResumeable && isSmResumptionPossible();
1920    }
1921
1922    /**
1923     * Returns true if the stream is resumable.
1924     *
1925     * @return true if the stream is resumable.
1926     */
1927    public boolean isSmResumptionPossible() {
1928        // There is no resumable stream available
1929        if (smSessionId == null)
1930            return false;
1931
1932        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1933        // Seems like we are already reconnected, report true
1934        if (shutdownTimestamp == null) {
1935            return true;
1936        }
1937
1938        // See if resumption time is over
1939        long current = System.currentTimeMillis();
1940        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1941        if (current > shutdownTimestamp + maxResumptionMillies) {
1942            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1943            // resumption is possible
1944            return false;
1945        } else {
1946            return true;
1947        }
1948    }
1949
1950    /**
1951     * Drop the stream management state. Sets {@link #smSessionId} and
1952     * {@link #unacknowledgedStanzas} to <code>null</code>.
1953     */
1954    private void dropSmState() {
1955        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1956        // respective. No need to reset them here.
1957        smSessionId = null;
1958        unacknowledgedStanzas = null;
1959    }
1960
1961    /**
1962     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1963     * <p>
1964     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1965     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1966     * without checking for overflows before.
1967     * </p>
1968     *
1969     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1970     */
1971    public int getMaxSmResumptionTime() {
1972        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1973        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1974        return Math.min(clientResumptionTime, serverResumptionTime);
1975    }
1976
1977    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1978        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1979        final List<Stanza> ackedStanzas = new ArrayList<>(
1980                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1981                                        : Integer.MAX_VALUE);
1982        for (long i = 0; i < ackedStanzasCount; i++) {
1983            Stanza ackedStanza = unacknowledgedStanzas.poll();
1984            // If the server ack'ed a stanza, then it must be in the
1985            // unacknowledged stanza queue. There can be no exception.
1986            if (ackedStanza == null) {
1987                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1988                                ackedStanzasCount, ackedStanzas);
1989            }
1990            ackedStanzas.add(ackedStanza);
1991        }
1992
1993        boolean atLeastOneStanzaAcknowledgedListener = false;
1994        if (!stanzaAcknowledgedListeners.isEmpty()) {
1995            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1996            atLeastOneStanzaAcknowledgedListener = true;
1997        }
1998        else {
1999            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
2000            for (Stanza ackedStanza : ackedStanzas) {
2001                String id = ackedStanza.getStanzaId();
2002                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
2003                    atLeastOneStanzaAcknowledgedListener = true;
2004                    break;
2005                }
2006            }
2007        }
2008
2009        // Only spawn a new thread if there is a chance that some listener is invoked
2010        if (atLeastOneStanzaAcknowledgedListener) {
2011            asyncGo(new Runnable() {
2012                @Override
2013                public void run() {
2014                    for (Stanza ackedStanza : ackedStanzas) {
2015                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
2016                            try {
2017                                listener.processStanza(ackedStanza);
2018                            }
2019                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
2020                                LOGGER.log(Level.FINER, "Received exception", e);
2021                            }
2022                        }
2023                        String id = ackedStanza.getStanzaId();
2024                        if (StringUtils.isNullOrEmpty(id)) {
2025                            continue;
2026                        }
2027                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
2028                        if (listener != null) {
2029                            try {
2030                                listener.processStanza(ackedStanza);
2031                            }
2032                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
2033                                LOGGER.log(Level.FINER, "Received exception", e);
2034                            }
2035                        }
2036                    }
2037                }
2038            });
2039        }
2040
2041        serverHandledStanzasCount = handledCount;
2042    }
2043
2044    /**
2045     * Set the default bundle and defer callback used for new connections.
2046     *
2047     * @param defaultBundleAndDeferCallback
2048     * @see BundleAndDeferCallback
2049     * @since 4.1
2050     */
2051    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
2052        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
2053    }
2054
2055    /**
2056     * Set the bundle and defer callback used for this connection.
2057     * <p>
2058     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
2059     * no longer get deferred.
2060     * </p>
2061     *
2062     * @param bundleAndDeferCallback the callback or <code>null</code>.
2063     * @see BundleAndDeferCallback
2064     * @since 4.1
2065     */
2066    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
2067        this.bundleAndDeferCallback = bundleAndDeferCallback;
2068    }
2069
2070}