001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.ByteArrayInputStream;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.OutputStream;
026import java.io.OutputStreamWriter;
027import java.io.Writer;
028import java.lang.reflect.Constructor;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.net.Socket;
032import java.security.KeyManagementException;
033import java.security.KeyStore;
034import java.security.KeyStoreException;
035import java.security.NoSuchAlgorithmException;
036import java.security.NoSuchProviderException;
037import java.security.Provider;
038import java.security.SecureRandom;
039import java.security.Security;
040import java.security.UnrecoverableKeyException;
041import java.security.cert.CertificateException;
042import java.util.ArrayList;
043import java.util.Collection;
044import java.util.Iterator;
045import java.util.LinkedHashSet;
046import java.util.LinkedList;
047import java.util.List;
048import java.util.Map;
049import java.util.Set;
050import java.util.concurrent.ArrayBlockingQueue;
051import java.util.concurrent.BlockingQueue;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentLinkedQueue;
054import java.util.concurrent.Semaphore;
055import java.util.concurrent.TimeUnit;
056import java.util.concurrent.atomic.AtomicBoolean;
057import java.util.logging.Level;
058import java.util.logging.Logger;
059
060import javax.net.SocketFactory;
061import javax.net.ssl.HostnameVerifier;
062import javax.net.ssl.KeyManager;
063import javax.net.ssl.KeyManagerFactory;
064import javax.net.ssl.SSLContext;
065import javax.net.ssl.SSLSession;
066import javax.net.ssl.SSLSocket;
067import javax.net.ssl.TrustManager;
068import javax.net.ssl.X509TrustManager;
069import javax.security.auth.callback.Callback;
070import javax.security.auth.callback.CallbackHandler;
071import javax.security.auth.callback.PasswordCallback;
072
073import org.jivesoftware.smack.AbstractConnectionListener;
074import org.jivesoftware.smack.AbstractXMPPConnection;
075import org.jivesoftware.smack.ConnectionConfiguration;
076import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode;
077import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
078import org.jivesoftware.smack.SmackConfiguration;
079import org.jivesoftware.smack.SmackException;
080import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
081import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
082import org.jivesoftware.smack.SmackException.ConnectionException;
083import org.jivesoftware.smack.SmackException.NoResponseException;
084import org.jivesoftware.smack.SmackException.NotConnectedException;
085import org.jivesoftware.smack.SmackException.NotLoggedInException;
086import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
087import org.jivesoftware.smack.SmackException.SmackWrappedException;
088import org.jivesoftware.smack.SmackFuture;
089import org.jivesoftware.smack.StanzaListener;
090import org.jivesoftware.smack.SynchronizationPoint;
091import org.jivesoftware.smack.XMPPConnection;
092import org.jivesoftware.smack.XMPPException;
093import org.jivesoftware.smack.XMPPException.FailedNonzaException;
094import org.jivesoftware.smack.XMPPException.StreamErrorException;
095import org.jivesoftware.smack.compress.packet.Compress;
096import org.jivesoftware.smack.compress.packet.Compressed;
097import org.jivesoftware.smack.compression.XMPPInputOutputStream;
098import org.jivesoftware.smack.filter.StanzaFilter;
099import org.jivesoftware.smack.packet.Element;
100import org.jivesoftware.smack.packet.IQ;
101import org.jivesoftware.smack.packet.Message;
102import org.jivesoftware.smack.packet.Nonza;
103import org.jivesoftware.smack.packet.Presence;
104import org.jivesoftware.smack.packet.Stanza;
105import org.jivesoftware.smack.packet.StartTls;
106import org.jivesoftware.smack.packet.StreamError;
107import org.jivesoftware.smack.packet.StreamOpen;
108import org.jivesoftware.smack.proxy.ProxyInfo;
109import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
110import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
111import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
112import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
113import org.jivesoftware.smack.sm.SMUtils;
114import org.jivesoftware.smack.sm.StreamManagementException;
115import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
116import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
117import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
118import org.jivesoftware.smack.sm.packet.StreamManagement;
119import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
120import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
121import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
122import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
123import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
124import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
125import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
126import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
127import org.jivesoftware.smack.sm.predicates.Predicate;
128import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
129import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
130import org.jivesoftware.smack.util.Async;
131import org.jivesoftware.smack.util.DNSUtil;
132import org.jivesoftware.smack.util.PacketParserUtils;
133import org.jivesoftware.smack.util.StringUtils;
134import org.jivesoftware.smack.util.TLSUtils;
135import org.jivesoftware.smack.util.XmlStringBuilder;
136import org.jivesoftware.smack.util.dns.HostAddress;
137import org.jivesoftware.smack.util.dns.SmackDaneProvider;
138import org.jivesoftware.smack.util.dns.SmackDaneVerifier;
139
140import org.jxmpp.jid.impl.JidCreate;
141import org.jxmpp.jid.parts.Resourcepart;
142import org.jxmpp.stringprep.XmppStringprepException;
143import org.jxmpp.util.XmppStringUtils;
144import org.xmlpull.v1.XmlPullParser;
145import org.xmlpull.v1.XmlPullParserException;
146
147/**
148 * Creates a socket connection to an XMPP server. This is the default connection
149 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
150 *
151 * @see XMPPConnection
152 * @author Matt Tucker
153 */
154public class XMPPTCPConnection extends AbstractXMPPConnection {
155
156    private static final int QUEUE_SIZE = 500;
157    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
158
159    /**
160     * The socket which is used for this connection.
161     */
162    private Socket socket;
163
164    /**
165     *
166     */
167    private boolean disconnectedButResumeable = false;
168
169    private SSLSocket secureSocket;
170
171    private final Semaphore readerWriterSemaphore = new Semaphore(2);
172
173    /**
174     * Protected access level because of unit test purposes
175     */
176    protected final PacketWriter packetWriter = new PacketWriter();
177
178    /**
179     * Protected access level because of unit test purposes
180     */
181    protected final PacketReader packetReader = new PacketReader();
182
183    private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>(
184                    this, "initial open stream element send to server");
185
186    /**
187     *
188     */
189    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
190                    this, "stream compression feature");
191
192    /**
193     *
194     */
195    private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
196                    this, "stream compression");
197
198    /**
199     * A synchronization point which is successful if this connection has received the closing
200     * stream element from the remote end-point, i.e. the server.
201     */
202    private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
203                    this, "stream closing element received");
204
205    /**
206     * The default bundle and defer callback, used for new connections.
207     * @see bundleAndDeferCallback
208     */
209    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
210
211    /**
212     * The used bundle and defer callback.
213     * <p>
214     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
215     * having a 'volatile' read within the writer threads loop.
216     * </p>
217     */
218    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
219
220    private static boolean useSmDefault = true;
221
222    private static boolean useSmResumptionDefault = true;
223
224    /**
225     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
226     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
227     * {@link #unacknowledgedStanzas}.
228     */
229    private String smSessionId;
230
231    private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
232                    this, "stream resumed element");
233
234    private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
235                    this, "stream enabled element");
236
237    /**
238     * The client's preferred maximum resumption time in seconds.
239     */
240    private int smClientMaxResumptionTime = -1;
241
242    /**
243     * The server's preferred maximum resumption time in seconds.
244     */
245    private int smServerMaxResumptionTime = -1;
246
247    /**
248     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
249     */
250    private boolean useSm = useSmDefault;
251    private boolean useSmResumption = useSmResumptionDefault;
252
253    /**
254     * The counter that the server sends the client about it's current height. For example, if the server sends
255     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
256     */
257    private long serverHandledStanzasCount = 0;
258
259    /**
260     * The counter for stanzas handled ("received") by the client.
261     * <p>
262     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
263     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
264     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
265     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
266     * </p>
267     */
268    private long clientHandledStanzasCount = 0;
269
270    private BlockingQueue<Stanza> unacknowledgedStanzas;
271
272    /**
273     * Set to true if Stream Management was at least once enabled for this connection.
274     */
275    private boolean smWasEnabledAtLeastOnce = false;
276
277    /**
278     * This listeners are invoked for every stanza that got acknowledged.
279     * <p>
280     * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove
281     * themselves after they have been invoked.
282     * </p>
283     */
284    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>();
285
286    /**
287     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
288     * only be invoked once and automatically removed after that.
289     */
290    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>();
291
292    /**
293     * Predicates that determine if an stream management ack should be requested from the server.
294     * <p>
295     * We use a linked hash set here, so that the order how the predicates are added matches the
296     * order in which they are invoked in order to determine if an ack request should be send or not.
297     * </p>
298     */
299    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>();
300
301    @SuppressWarnings("HidingField")
302    private final XMPPTCPConnectionConfiguration config;
303
304    /**
305     * Creates a new XMPP connection over TCP (optionally using proxies).
306     * <p>
307     * Note that XMPPTCPConnection constructors do not establish a connection to the server
308     * and you must call {@link #connect()}.
309     * </p>
310     *
311     * @param config the connection configuration.
312     */
313    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
314        super(config);
315        this.config = config;
316        addConnectionListener(new AbstractConnectionListener() {
317            @Override
318            public void connectionClosedOnError(Exception e) {
319                if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) {
320                    dropSmState();
321                }
322            }
323        });
324    }
325
326    /**
327     * Creates a new XMPP connection over TCP.
328     * <p>
329     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
330     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
331     * constructor.
332     * </p>
333     *
334     * @param jid the bare JID used by the client.
335     * @param password the password or authentication token.
336     * @throws XmppStringprepException
337     */
338    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
339        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
340    }
341
342    /**
343     * Creates a new XMPP connection over TCP.
344     * <p>
345     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
346     * you can get fine-grained control over connection settings using the
347     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
348     * </p>
349     * @param username
350     * @param password
351     * @param serviceName
352     * @throws XmppStringprepException
353     */
354    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
355        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
356                                        JidCreate.domainBareFrom(serviceName)).build());
357    }
358
359    @Override
360    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
361        if (packetWriter == null) {
362            throw new NotConnectedException();
363        }
364        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
365    }
366
367    @Override
368    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
369        if (isConnected() && !disconnectedButResumeable) {
370            throw new AlreadyConnectedException();
371        }
372    }
373
374    @Override
375    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
376        if (isAuthenticated() && !disconnectedButResumeable) {
377            throw new AlreadyLoggedInException();
378        }
379    }
380
381    @Override
382    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
383        // Reset the flag in case it was set
384        disconnectedButResumeable = false;
385        super.afterSuccessfulLogin(resumed);
386    }
387
388    @Override
389    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
390                    SmackException, IOException, InterruptedException {
391        // Authenticate using SASL
392        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
393        saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
394
395        // Wait for stream features after the authentication.
396        // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be
397        // renamed to "streamFeaturesAfterAuthenticationReceived".
398        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
399
400        // If compression is enabled then request the server to use stream compression. XEP-170
401        // recommends to perform stream compression before resource binding.
402        maybeEnableCompression();
403
404        if (isSmResumptionPossible()) {
405            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
406            if (smResumedSyncPoint.wasSuccessful()) {
407                // We successfully resumed the stream, be done here
408                afterSuccessfulLogin(true);
409                return;
410            }
411            // SM resumption failed, what Smack does here is to report success of
412            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
413            // normal resource binding can be tried.
414            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
415        }
416
417        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
418        if (unacknowledgedStanzas != null) {
419            // There was a previous connection with SM enabled but that was either not resumable or
420            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
421            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
422            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
423            // XMPP session (There maybe was an enabled in a previous XMPP session of this
424            // connection instance though). This is used in writePackets to decide if stanzas should
425            // be added to the unacknowledged stanzas queue, because they have to be added right
426            // after the 'enable' stream element has been sent.
427            dropSmState();
428        }
429
430        // Now bind the resource. It is important to do this *after* we dropped an eventually
431        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
432        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
433        bindResourceAndEstablishSession(resource);
434
435        if (isSmAvailable() && useSm) {
436            // Remove what is maybe left from previously stream managed sessions
437            serverHandledStanzasCount = 0;
438            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
439            // then this is a non recoverable error and we therefore throw an exception.
440            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
441            synchronized (requestAckPredicates) {
442                if (requestAckPredicates.isEmpty()) {
443                    // Assure that we have at lest one predicate set up that so that we request acks
444                    // for the server and eventually flush some stanzas from the unacknowledged
445                    // stanza queue
446                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
447                }
448            }
449        }
450        // (Re-)send the stanzas *after* we tried to enable SM
451        for (Stanza stanza : previouslyUnackedStanzas) {
452            sendStanzaInternal(stanza);
453        }
454
455        afterSuccessfulLogin(false);
456    }
457
458    @Override
459    public boolean isSecureConnection() {
460        return secureSocket != null;
461    }
462
463    /**
464     * Shuts the current connection down. After this method returns, the connection must be ready
465     * for re-use by connect.
466     */
467    @Override
468    protected void shutdown() {
469        if (isSmEnabled()) {
470            try {
471                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
472                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
473                sendSmAcknowledgementInternal();
474            } catch (InterruptedException | NotConnectedException e) {
475                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
476            }
477        }
478        shutdown(false);
479    }
480
481    /**
482     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
483     */
484    public synchronized void instantShutdown() {
485        shutdown(true);
486    }
487
488    private void shutdown(boolean instant) {
489        if (disconnectedButResumeable) {
490            return;
491        }
492
493        // First shutdown the writer, this will result in a closing stream element getting send to
494        // the server
495        LOGGER.finer("PacketWriter shutdown()");
496        packetWriter.shutdown(instant);
497        LOGGER.finer("PacketWriter has been shut down");
498
499        if (!instant) {
500            try {
501                // After we send the closing stream element, check if there was already a
502                // closing stream element sent by the server or wait with a timeout for a
503                // closing stream element to be received from the server.
504                @SuppressWarnings("unused")
505                Exception res = closingStreamReceived.checkIfSuccessOrWait();
506            } catch (InterruptedException | NoResponseException e) {
507                LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
508            }
509        }
510
511        LOGGER.finer("PacketReader shutdown()");
512        packetReader.shutdown();
513        LOGGER.finer("PacketReader has been shut down");
514
515        try {
516                socket.close();
517        } catch (Exception e) {
518                LOGGER.log(Level.WARNING, "shutdown", e);
519        }
520
521        setWasAuthenticated();
522        // If we are able to resume the stream, then don't set
523        // connected/authenticated/usingTLS to false since we like behave like we are still
524        // connected (e.g. sendStanza should not throw a NotConnectedException).
525        if (isSmResumptionPossible() && instant) {
526            disconnectedButResumeable = true;
527        } else {
528            disconnectedButResumeable = false;
529            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
530            // stream tag, there is no longer a stream to resume.
531            smSessionId = null;
532        }
533        authenticated = false;
534        connected = false;
535        secureSocket = null;
536        reader = null;
537        writer = null;
538
539        maybeCompressFeaturesReceived.init();
540        compressSyncPoint.init();
541        smResumedSyncPoint.init();
542        smEnabledSyncPoint.init();
543        initialOpenStreamSend.init();
544    }
545
546    @Override
547    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
548        packetWriter.sendStreamElement(element);
549    }
550
551    @Override
552    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
553        packetWriter.sendStreamElement(packet);
554        if (isSmEnabled()) {
555            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
556                if (requestAckPredicate.accept(packet)) {
557                    requestSmAcknowledgementInternal();
558                    break;
559                }
560            }
561        }
562    }
563
564    private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException {
565        List<HostAddress> failedAddresses = populateHostAddresses();
566        SocketFactory socketFactory = config.getSocketFactory();
567        ProxyInfo proxyInfo = config.getProxyInfo();
568        int timeout = config.getConnectTimeout();
569        if (socketFactory == null) {
570            socketFactory = SocketFactory.getDefault();
571        }
572        for (HostAddress hostAddress : hostAddresses) {
573            Iterator<InetAddress> inetAddresses;
574            String host = hostAddress.getHost();
575            int port = hostAddress.getPort();
576            if (proxyInfo == null) {
577                inetAddresses = hostAddress.getInetAddresses().iterator();
578                assert (inetAddresses.hasNext());
579
580                innerloop: while (inetAddresses.hasNext()) {
581                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
582                    // re-usable after a failed connection attempt. See also SMACK-724.
583                    SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory);
584
585                    final InetAddress inetAddress = inetAddresses.next();
586                    final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port);
587                    LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress);
588                    socketFuture.connectAsync(inetSocketAddress, timeout);
589
590                    try {
591                        socket = socketFuture.getOrThrow();
592                    } catch (IOException e) {
593                        hostAddress.setException(inetAddress, e);
594                        if (inetAddresses.hasNext()) {
595                            continue innerloop;
596                        } else {
597                            break innerloop;
598                        }
599                    }
600                    LOGGER.finer("Established TCP connection to " + inetSocketAddress);
601                    // We found a host to connect to, return here
602                    this.host = host;
603                    this.port = port;
604                    return;
605                }
606                failedAddresses.add(hostAddress);
607            } else {
608                socket = socketFactory.createSocket();
609                StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy");
610                final String hostAndPort = host + " at port " + port;
611                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
612                try {
613                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
614                } catch (IOException e) {
615                    hostAddress.setException(e);
616                    continue;
617                }
618                LOGGER.finer("Established TCP connection to " + hostAndPort);
619                // We found a host to connect to, return here
620                this.host = host;
621                this.port = port;
622                return;
623            }
624        }
625        // There are no more host addresses to try
626        // throw an exception and report all tried
627        // HostAddresses in the exception
628        throw ConnectionException.from(failedAddresses);
629    }
630
631    /**
632     * Initializes the connection by creating a stanza reader and writer and opening a
633     * XMPP stream to the server.
634     *
635     * @throws XMPPException if establishing a connection to the server fails.
636     * @throws SmackException if the server fails to respond back or if there is anther error.
637     * @throws IOException
638     * @throws InterruptedException
639     */
640    private void initConnection() throws IOException, InterruptedException {
641        compressionHandler = null;
642
643        // Set the reader and writer instance variables
644        initReaderAndWriter();
645
646        int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits();
647        if (availableReaderWriterSemaphorePermits < 2) {
648            Object[] logObjects = new Object[] {
649                            this,
650                            availableReaderWriterSemaphorePermits,
651            };
652            LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects);
653        }
654        readerWriterSemaphore.acquire(2);
655        // Start the writer thread. This will open an XMPP stream to the server
656        packetWriter.init();
657        // Start the reader thread. The startup() method will block until we
658        // get an opening stream packet back from server
659        packetReader.init();
660    }
661
662    private void initReaderAndWriter() throws IOException {
663        InputStream is = socket.getInputStream();
664        OutputStream os = socket.getOutputStream();
665        if (compressionHandler != null) {
666            is = compressionHandler.getInputStream(is);
667            os = compressionHandler.getOutputStream(os);
668        }
669        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
670        writer = new OutputStreamWriter(os, "UTF-8");
671        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
672
673        // If debugging is enabled, we open a window and write out all network traffic.
674        initDebugger();
675    }
676
677    /**
678     * The server has indicated that TLS negotiation can start. We now need to secure the
679     * existing plain connection and perform a handshake. This method won't return until the
680     * connection has finished the handshake or an error occurred while securing the connection.
681     * @throws IOException
682     * @throws CertificateException
683     * @throws NoSuchAlgorithmException
684     * @throws NoSuchProviderException
685     * @throws KeyStoreException
686     * @throws UnrecoverableKeyException
687     * @throws KeyManagementException
688     * @throws SmackException
689     * @throws Exception if an exception occurs.
690     */
691    @SuppressWarnings("LiteralClassName")
692    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
693        SmackDaneVerifier daneVerifier = null;
694
695        if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) {
696            SmackDaneProvider daneProvider = DNSUtil.getDaneProvider();
697            if (daneProvider == null) {
698                throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured");
699            }
700            daneVerifier = daneProvider.newInstance();
701            if (daneVerifier == null) {
702                throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier");
703            }
704        }
705
706        SSLContext context = this.config.getCustomSSLContext();
707        KeyStore ks = null;
708        PasswordCallback pcb = null;
709
710        if (context == null) {
711            final String keyStoreType = config.getKeystoreType();
712            final CallbackHandler callbackHandler = config.getCallbackHandler();
713            final String keystorePath = config.getKeystorePath();
714            if ("PKCS11".equals(keyStoreType)) {
715                try {
716                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
717                    String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library();
718                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8));
719                    Provider p = (Provider) c.newInstance(config);
720                    Security.addProvider(p);
721                    ks = KeyStore.getInstance("PKCS11",p);
722                    pcb = new PasswordCallback("PKCS11 Password: ",false);
723                    callbackHandler.handle(new Callback[] {pcb});
724                    ks.load(null,pcb.getPassword());
725                }
726                catch (Exception e) {
727                    LOGGER.log(Level.WARNING, "Exception", e);
728                    ks = null;
729                }
730            }
731            else if ("Apple".equals(keyStoreType)) {
732                ks = KeyStore.getInstance("KeychainStore","Apple");
733                ks.load(null,null);
734                // pcb = new PasswordCallback("Apple Keychain",false);
735                // pcb.setPassword(null);
736            }
737            else if (keyStoreType != null) {
738                ks = KeyStore.getInstance(keyStoreType);
739                if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) {
740                    try {
741                        pcb = new PasswordCallback("Keystore Password: ", false);
742                        callbackHandler.handle(new Callback[] { pcb });
743                        ks.load(new FileInputStream(keystorePath), pcb.getPassword());
744                    }
745                    catch (Exception e) {
746                        LOGGER.log(Level.WARNING, "Exception", e);
747                        ks = null;
748                    }
749                } else {
750                    ks.load(null, null);
751                }
752            }
753
754            KeyManager[] kms = null;
755
756            if (ks != null) {
757                String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
758                KeyManagerFactory kmf = null;
759                try {
760                    kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm);
761                }
762                catch (NoSuchAlgorithmException e) {
763                    LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '"
764                                    + keyManagerFactoryAlgorithm + "' algorithm", e);
765                }
766                if (kmf != null) {
767                    try {
768                        if (pcb == null) {
769                            kmf.init(ks, null);
770                        }
771                        else {
772                            kmf.init(ks, pcb.getPassword());
773                            pcb.clearPassword();
774                        }
775                        kms = kmf.getKeyManagers();
776                    }
777                    catch (NullPointerException npe) {
778                        LOGGER.log(Level.WARNING, "NullPointerException", npe);
779                    }
780                }
781            }
782
783            // If the user didn't specify a SSLContext, use the default one
784            context = SSLContext.getInstance("TLS");
785
786            final SecureRandom secureRandom = new java.security.SecureRandom();
787            X509TrustManager customTrustManager = config.getCustomX509TrustManager();
788
789            if (daneVerifier != null) {
790                // User requested DANE verification.
791                daneVerifier.init(context, kms, customTrustManager, secureRandom);
792            } else {
793                TrustManager[] customTrustManagers = null;
794                if (customTrustManager != null) {
795                    customTrustManagers = new TrustManager[] { customTrustManager };
796                }
797                context.init(kms, customTrustManagers, secureRandom);
798            }
799        }
800
801        Socket plain = socket;
802        // Secure the plain connection
803        socket = context.getSocketFactory().createSocket(plain,
804                config.getXMPPServiceDomain().toString(), plain.getPort(), true);
805
806        final SSLSocket sslSocket = (SSLSocket) socket;
807        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
808        // important (at least on certain platforms) and it seems to be a good idea anyways to
809        // prevent an accidental implicit handshake.
810        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
811
812        // Initialize the reader and writer with the new secured version
813        initReaderAndWriter();
814
815        // Proceed to do the handshake
816        sslSocket.startHandshake();
817
818        if (daneVerifier != null) {
819            daneVerifier.finish(sslSocket);
820        }
821
822        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
823        if (verifier == null) {
824                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
825        } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) {
826            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain());
827        }
828
829        // Set that TLS was successful
830        secureSocket = sslSocket;
831    }
832
833    /**
834     * Returns the compression handler that can be used for one compression methods offered by the server.
835     *
836     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
837     *
838     */
839    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
840        for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) {
841                String method = handler.getCompressionMethod();
842                if (compression.getMethods().contains(method))
843                    return handler;
844        }
845        return null;
846    }
847
848    @Override
849    public boolean isUsingCompression() {
850        return compressionHandler != null && compressSyncPoint.wasSuccessful();
851    }
852
853    /**
854     * <p>
855     * Starts using stream compression that will compress network traffic. Traffic can be
856     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
857     * connection. However, the server and the client will need to use more CPU time in order to
858     * un/compress network data so under high load the server performance might be affected.
859     * </p>
860     * <p>
861     * Stream compression has to have been previously offered by the server. Currently only the
862     * zlib method is supported by the client. Stream compression negotiation has to be done
863     * before authentication took place.
864     * </p>
865     *
866     * @throws NotConnectedException
867     * @throws SmackException
868     * @throws NoResponseException
869     * @throws InterruptedException
870     */
871    private void maybeEnableCompression() throws SmackException, InterruptedException {
872        if (!config.isCompressionEnabled()) {
873            return;
874        }
875
876        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
877        if (compression == null) {
878            // Server does not support compression
879            return;
880        }
881        // If stream compression was offered by the server and we want to use
882        // compression then send compression request to the server
883        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
884            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
885        } else {
886            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
887        }
888    }
889
890    /**
891     * Establishes a connection to the XMPP server. It basically
892     * creates and maintains a socket connection to the server.
893     * <p>
894     * Listeners will be preserved from a previous connection if the reconnection
895     * occurs after an abrupt termination.
896     * </p>
897     *
898     * @throws XMPPException if an error occurs while trying to establish the connection.
899     * @throws SmackException
900     * @throws IOException
901     * @throws InterruptedException
902     */
903    @Override
904    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
905        closingStreamReceived.init();
906        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
907        // there is an error establishing the connection
908        connectUsingConfiguration();
909
910        // We connected successfully to the servers TCP port
911        initConnection();
912
913        // TLS handled will be successful either if TLS was established, or if it was not mandatory.
914        tlsHandled.checkIfSuccessOrWaitOrThrow();
915
916        // Wait with SASL auth until the SASL mechanisms have been received
917        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
918    }
919
920    /**
921     * Sends out a notification that there was an error with the connection
922     * and closes the connection. Also prints the stack trace of the given exception
923     *
924     * @param e the exception that causes the connection close event.
925     */
926    private synchronized void notifyConnectionError(Exception e) {
927        // Listeners were already notified of the exception, return right here.
928        if (packetReader.done && packetWriter.done()) return;
929
930        SmackWrappedException smackWrappedException = new SmackWrappedException(e);
931        tlsHandled.reportGenericFailure(smackWrappedException);
932        saslFeatureReceived.reportGenericFailure(smackWrappedException);
933        maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException);
934        lastFeaturesReceived.reportGenericFailure(smackWrappedException);
935
936        // Closes the connection temporary. A reconnection is possible
937        // Note that a connection listener of XMPPTCPConnection will drop the SM state in
938        // case the Exception is a StreamErrorException.
939        instantShutdown();
940
941        // Notify connection listeners of the error.
942        callConnectionClosedOnErrorListener(e);
943    }
944
945    /**
946     * For unit testing purposes
947     *
948     * @param writer
949     */
950    protected void setWriter(Writer writer) {
951        this.writer = writer;
952    }
953
954    @Override
955    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException {
956        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
957        if (startTlsFeature != null) {
958            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
959                SmackException smackException = new SecurityRequiredByServerException();
960                tlsHandled.reportFailure(smackException);
961                notifyConnectionError(smackException);
962                return;
963            }
964
965            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
966                sendNonza(new StartTls());
967            } else {
968                tlsHandled.reportSuccess();
969            }
970        } else {
971            tlsHandled.reportSuccess();
972        }
973
974        if (getSASLAuthentication().authenticationSuccessful()) {
975            // If we have received features after the SASL has been successfully completed, then we
976            // have also *maybe* received, as it is an optional feature, the compression feature
977            // from the server.
978            maybeCompressFeaturesReceived.reportSuccess();
979        }
980    }
981
982    /**
983     * Resets the parser using the latest connection's reader. Resetting the parser is necessary
984     * when the plain connection has been secured or when a new opening stream element is going
985     * to be sent by the server.
986     *
987     * @throws SmackException if the parser could not be reset.
988     * @throws InterruptedException
989     */
990    void openStream() throws SmackException, InterruptedException {
991        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
992        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
993        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
994        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
995        CharSequence to = getXMPPServiceDomain();
996        CharSequence from = null;
997        CharSequence localpart = config.getUsername();
998        if (localpart != null) {
999            from = XmppStringUtils.completeJidFrom(localpart, to);
1000        }
1001        String id = getStreamId();
1002        sendNonza(new StreamOpen(to, from, id));
1003        try {
1004            packetReader.parser = PacketParserUtils.newXmppParser(reader);
1005        }
1006        catch (XmlPullParserException e) {
1007            throw new SmackException(e);
1008        }
1009    }
1010
1011    protected class PacketReader {
1012
1013        XmlPullParser parser;
1014
1015        private volatile boolean done;
1016
1017        /**
1018         * Initializes the reader in order to be used. The reader is initialized during the
1019         * first connection and when reconnecting due to an abruptly disconnection.
1020         */
1021        void init() {
1022            done = false;
1023
1024            Async.go(new Runnable() {
1025                @Override
1026                public void run() {
1027                    try {
1028                        parsePackets();
1029                    } finally {
1030                        XMPPTCPConnection.this.readerWriterSemaphore.release();
1031                    }
1032                }
1033            }, "Smack Reader (" + getConnectionCounter() + ")");
1034         }
1035
1036        /**
1037         * Shuts the stanza reader down. This method simply sets the 'done' flag to true.
1038         */
1039        void shutdown() {
1040            done = true;
1041        }
1042
1043        /**
1044         * Parse top-level packets in order to process them further.
1045         */
1046        private void parsePackets() {
1047            try {
1048                initialOpenStreamSend.checkIfSuccessOrWait();
1049                int eventType = parser.getEventType();
1050                while (!done) {
1051                    switch (eventType) {
1052                    case XmlPullParser.START_TAG:
1053                        final String name = parser.getName();
1054                        switch (name) {
1055                        case Message.ELEMENT:
1056                        case IQ.IQ_ELEMENT:
1057                        case Presence.ELEMENT:
1058                            try {
1059                                parseAndProcessStanza(parser);
1060                            } finally {
1061                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1062                            }
1063                            break;
1064                        case "stream":
1065                            // We found an opening stream.
1066                            if ("jabber:client".equals(parser.getNamespace(null))) {
1067                                streamId = parser.getAttributeValue("", "id");
1068                                String reportedServerDomain = parser.getAttributeValue("", "from");
1069                                assert (config.getXMPPServiceDomain().equals(reportedServerDomain));
1070                            }
1071                            break;
1072                        case "error":
1073                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
1074                            saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
1075                            // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
1076                            // point to report the error, which is checked immediately after tlsHandled in
1077                            // connectInternal().
1078                            tlsHandled.reportSuccess();
1079                            throw new StreamErrorException(streamError);
1080                        case "features":
1081                            parseFeatures(parser);
1082                            break;
1083                        case "proceed":
1084                            try {
1085                                // Secure the connection by negotiating TLS
1086                                proceedTLSReceived();
1087                                // Send a new opening stream to the server
1088                                openStream();
1089                            }
1090                            catch (Exception e) {
1091                                SmackException smackException = new SmackException(e);
1092                                tlsHandled.reportFailure(smackException);
1093                                throw e;
1094                            }
1095                            break;
1096                        case "failure":
1097                            String namespace = parser.getNamespace(null);
1098                            switch (namespace) {
1099                            case "urn:ietf:params:xml:ns:xmpp-tls":
1100                                // TLS negotiation has failed. The server will close the connection
1101                                // TODO Parse failure stanza
1102                                throw new SmackException("TLS negotiation has failed");
1103                            case "http://jabber.org/protocol/compress":
1104                                // Stream compression has been denied. This is a recoverable
1105                                // situation. It is still possible to authenticate and
1106                                // use the connection but using an uncompressed connection
1107                                // TODO Parse failure stanza
1108                                compressSyncPoint.reportFailure(new SmackException(
1109                                                "Could not establish compression"));
1110                                break;
1111                            case SaslStreamElements.NAMESPACE:
1112                                // SASL authentication has failed. The server may close the connection
1113                                // depending on the number of retries
1114                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1115                                getSASLAuthentication().authenticationFailed(failure);
1116                                break;
1117                            }
1118                            break;
1119                        case Challenge.ELEMENT:
1120                            // The server is challenging the SASL authentication made by the client
1121                            String challengeData = parser.nextText();
1122                            getSASLAuthentication().challengeReceived(challengeData);
1123                            break;
1124                        case Success.ELEMENT:
1125                            Success success = new Success(parser.nextText());
1126                            // We now need to bind a resource for the connection
1127                            // Open a new stream and wait for the response
1128                            openStream();
1129                            // The SASL authentication with the server was successful. The next step
1130                            // will be to bind the resource
1131                            getSASLAuthentication().authenticated(success);
1132                            break;
1133                        case Compressed.ELEMENT:
1134                            // Server confirmed that it's possible to use stream compression. Start
1135                            // stream compression
1136                            // Initialize the reader and writer with the new compressed version
1137                            initReaderAndWriter();
1138                            // Send a new opening stream to the server
1139                            openStream();
1140                            // Notify that compression is being used
1141                            compressSyncPoint.reportSuccess();
1142                            break;
1143                        case Enabled.ELEMENT:
1144                            Enabled enabled = ParseStreamManagement.enabled(parser);
1145                            if (enabled.isResumeSet()) {
1146                                smSessionId = enabled.getId();
1147                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1148                                    SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received");
1149                                    smEnabledSyncPoint.reportFailure(xmppException);
1150                                    throw xmppException;
1151                                }
1152                                smServerMaxResumptionTime = enabled.getMaxResumptionTime();
1153                            } else {
1154                                // Mark this a non-resumable stream by setting smSessionId to null
1155                                smSessionId = null;
1156                            }
1157                            clientHandledStanzasCount = 0;
1158                            smWasEnabledAtLeastOnce = true;
1159                            smEnabledSyncPoint.reportSuccess();
1160                            LOGGER.fine("Stream Management (XEP-198): successfully enabled");
1161                            break;
1162                        case Failed.ELEMENT:
1163                            Failed failed = ParseStreamManagement.failed(parser);
1164                            FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition());
1165                            // If only XEP-198 would specify different failure elements for the SM
1166                            // enable and SM resume failure case. But this is not the case, so we
1167                            // need to determine if this is a 'Failed' response for either 'Enable'
1168                            // or 'Resume'.
1169                            if (smResumedSyncPoint.requestSent()) {
1170                                smResumedSyncPoint.reportFailure(xmppException);
1171                            }
1172                            else {
1173                                if (!smEnabledSyncPoint.requestSent()) {
1174                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1175                                }
1176                                smEnabledSyncPoint.reportFailure(new SmackException(xmppException));
1177                                // Report success for last lastFeaturesReceived so that in case a
1178                                // failed resumption, we can continue with normal resource binding.
1179                                // See text of XEP-198 5. below Example 11.
1180                                lastFeaturesReceived.reportSuccess();
1181                            }
1182                            break;
1183                        case Resumed.ELEMENT:
1184                            Resumed resumed = ParseStreamManagement.resumed(parser);
1185                            if (!smSessionId.equals(resumed.getPrevId())) {
1186                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1187                            }
1188                            // Mark SM as enabled
1189                            smEnabledSyncPoint.reportSuccess();
1190                            // First, drop the stanzas already handled by the server
1191                            processHandledCount(resumed.getHandledCount());
1192                            // Then re-send what is left in the unacknowledged queue
1193                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1194                            unacknowledgedStanzas.drainTo(stanzasToResend);
1195                            for (Stanza stanza : stanzasToResend) {
1196                                sendStanzaInternal(stanza);
1197                            }
1198                            // If there where stanzas resent, then request a SM ack for them.
1199                            // Writer's sendStreamElement() won't do it automatically based on
1200                            // predicates.
1201                            if (!stanzasToResend.isEmpty()) {
1202                                requestSmAcknowledgementInternal();
1203                            }
1204                            // Mark SM resumption as successful
1205                            smResumedSyncPoint.reportSuccess();
1206                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1207                            break;
1208                        case AckAnswer.ELEMENT:
1209                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1210                            processHandledCount(ackAnswer.getHandledCount());
1211                            break;
1212                        case AckRequest.ELEMENT:
1213                            ParseStreamManagement.ackRequest(parser);
1214                            if (smEnabledSyncPoint.wasSuccessful()) {
1215                                sendSmAcknowledgementInternal();
1216                            } else {
1217                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1218                            }
1219                            break;
1220                         default:
1221                             LOGGER.warning("Unknown top level stream element: " + name);
1222                             break;
1223                        }
1224                        break;
1225                    case XmlPullParser.END_TAG:
1226                        final String endTagName = parser.getName();
1227                        if ("stream".equals(endTagName)) {
1228                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1229                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1230                                break;
1231                            }
1232
1233                            // Check if the queue was already shut down before reporting success on closing stream tag
1234                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1235                            // did re-start the queue again, causing this writer to assume that the queue is not
1236                            // shutdown, which results in a call to disconnect().
1237                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1238                            closingStreamReceived.reportSuccess();
1239
1240                            if (queueWasShutdown) {
1241                                // We received a closing stream element *after* we initiated the
1242                                // termination of the session by sending a closing stream element to
1243                                // the server first
1244                                return;
1245                            } else {
1246                                // We received a closing stream element from the server without us
1247                                // sending a closing stream element first. This means that the
1248                                // server wants to terminate the session, therefore disconnect
1249                                // the connection
1250                                LOGGER.info(XMPPTCPConnection.this
1251                                                + " received closing </stream> element."
1252                                                + " Server wants to terminate the connection, calling disconnect()");
1253                                disconnect();
1254                            }
1255                        }
1256                        break;
1257                    case XmlPullParser.END_DOCUMENT:
1258                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1259                        // closing stream element before.
1260                        throw new SmackException(
1261                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1262                    }
1263                    eventType = parser.next();
1264                }
1265            }
1266            catch (Exception e) {
1267                closingStreamReceived.reportFailure(e);
1268                // The exception can be ignored if the the connection is 'done'
1269                // or if the it was caused because the socket got closed
1270                if (!(done || packetWriter.queue.isShutdown())) {
1271                    // Close the connection and notify connection listeners of the
1272                    // error.
1273                    notifyConnectionError(e);
1274                }
1275            }
1276        }
1277    }
1278
1279    protected class PacketWriter {
1280        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1281
1282        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>(
1283                        QUEUE_SIZE, true);
1284
1285        /**
1286         * Needs to be protected for unit testing purposes.
1287         */
1288        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>(
1289                        XMPPTCPConnection.this, "shutdown completed");
1290
1291        /**
1292         * If set, the stanza writer is shut down
1293         */
1294        protected volatile Long shutdownTimestamp = null;
1295
1296        private volatile boolean instantShutdown;
1297
1298        /**
1299         * True if some preconditions are given to start the bundle and defer mechanism.
1300         * <p>
1301         * This will likely get set to true right after the start of the writer thread, because
1302         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1303         * this field to true.
1304         * </p>
1305         */
1306        private boolean shouldBundleAndDefer;
1307
1308        /**
1309        * Initializes the writer in order to be used. It is called at the first connection and also
1310        * is invoked if the connection is disconnected by an error.
1311        */
1312        void init() {
1313            shutdownDone.init();
1314            shutdownTimestamp = null;
1315
1316            if (unacknowledgedStanzas != null) {
1317                // It's possible that there are new stanzas in the writer queue that
1318                // came in while we were disconnected but resumable, drain those into
1319                // the unacknowledged queue so that they get resent now
1320                drainWriterQueueToUnacknowledgedStanzas();
1321            }
1322
1323            queue.start();
1324            Async.go(new Runnable() {
1325                @Override
1326                public void run() {
1327                    try {
1328                        writePackets();
1329                    } finally {
1330                        XMPPTCPConnection.this.readerWriterSemaphore.release();
1331                    }
1332                }
1333            }, "Smack Writer (" + getConnectionCounter() + ")");
1334        }
1335
1336        private boolean done() {
1337            return shutdownTimestamp != null;
1338        }
1339
1340        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1341            final boolean done = done();
1342            if (done) {
1343                final boolean smResumptionPossible = isSmResumptionPossible();
1344                // Don't throw a NotConnectedException is there is an resumable stream available
1345                if (!smResumptionPossible) {
1346                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1347                                    + " smResumptionPossible=" + smResumptionPossible);
1348                }
1349            }
1350        }
1351
1352        /**
1353         * Sends the specified element to the server.
1354         *
1355         * @param element the element to send.
1356         * @throws NotConnectedException
1357         * @throws InterruptedException
1358         */
1359        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1360            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1361            try {
1362                queue.put(element);
1363            }
1364            catch (InterruptedException e) {
1365                // put() may throw an InterruptedException for two reasons:
1366                // 1. If the queue was shut down
1367                // 2. If the thread was interrupted
1368                // so we have to check which is the case
1369                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1370                // If the method above did not throw, then the sending thread was interrupted
1371                throw e;
1372            }
1373        }
1374
1375        /**
1376         * Shuts down the stanza writer. Once this method has been called, no further
1377         * packets will be written to the server.
1378         * @throws InterruptedException
1379         */
1380        void shutdown(boolean instant) {
1381            instantShutdown = instant;
1382            queue.shutdown();
1383            shutdownTimestamp = System.currentTimeMillis();
1384            if (shutdownDone.isNotInInitialState()) {
1385                try {
1386                    shutdownDone.checkIfSuccessOrWait();
1387                } catch (NoResponseException | InterruptedException e) {
1388                    LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1389                }
1390            }
1391        }
1392
1393        /**
1394         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1395         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1396         * that case.
1397         *
1398         * @return the next element for writing or null.
1399         */
1400        private Element nextStreamElement() {
1401            // It is important the we check if the queue is empty before removing an element from it
1402            if (queue.isEmpty()) {
1403                shouldBundleAndDefer = true;
1404            }
1405            Element packet = null;
1406            try {
1407                packet = queue.take();
1408            }
1409            catch (InterruptedException e) {
1410                if (!queue.isShutdown()) {
1411                    // Users shouldn't try to interrupt the packet writer thread
1412                    LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1413                }
1414            }
1415            return packet;
1416        }
1417
1418        private void writePackets() {
1419            Exception writerException = null;
1420            try {
1421                openStream();
1422                initialOpenStreamSend.reportSuccess();
1423                // Write out packets from the queue.
1424                while (!done()) {
1425                    Element element = nextStreamElement();
1426                    if (element == null) {
1427                        continue;
1428                    }
1429
1430                    // Get a local version of the bundle and defer callback, in case it's unset
1431                    // between the null check and the method invocation
1432                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1433                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1434                    // empty), then we could wait a bit for further stanzas attempting to decrease
1435                    // our energy consumption
1436                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1437                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1438                        // queue is empty again.
1439                        shouldBundleAndDefer = false;
1440                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1441                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1442                                        bundlingAndDeferringStopped));
1443                        if (bundleAndDeferMillis > 0) {
1444                            long remainingWait = bundleAndDeferMillis;
1445                            final long waitStart = System.currentTimeMillis();
1446                            synchronized (bundlingAndDeferringStopped) {
1447                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1448                                    bundlingAndDeferringStopped.wait(remainingWait);
1449                                    remainingWait = bundleAndDeferMillis
1450                                                    - (System.currentTimeMillis() - waitStart);
1451                                }
1452                            }
1453                        }
1454                    }
1455
1456                    Stanza packet = null;
1457                    if (element instanceof Stanza) {
1458                        packet = (Stanza) element;
1459                    }
1460                    else if (element instanceof Enable) {
1461                        // The client needs to add messages to the unacknowledged stanzas queue
1462                        // right after it sent 'enabled'. Stanza will be added once
1463                        // unacknowledgedStanzas is not null.
1464                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1465                    }
1466                    maybeAddToUnacknowledgedStanzas(packet);
1467
1468                    CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE);
1469                    if (elementXml instanceof XmlStringBuilder) {
1470                        ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE);
1471                    }
1472                    else {
1473                        writer.write(elementXml.toString());
1474                    }
1475
1476                    if (queue.isEmpty()) {
1477                        writer.flush();
1478                    }
1479                    if (packet != null) {
1480                        firePacketSendingListeners(packet);
1481                    }
1482                }
1483                if (!instantShutdown) {
1484                    // Flush out the rest of the queue.
1485                    try {
1486                        while (!queue.isEmpty()) {
1487                            Element packet = queue.remove();
1488                            if (packet instanceof Stanza) {
1489                                Stanza stanza = (Stanza) packet;
1490                                maybeAddToUnacknowledgedStanzas(stanza);
1491                            }
1492                            writer.write(packet.toXML(null).toString());
1493                        }
1494                        writer.flush();
1495                    }
1496                    catch (Exception e) {
1497                        LOGGER.log(Level.WARNING,
1498                                        "Exception flushing queue during shutdown, ignore and continue",
1499                                        e);
1500                    }
1501
1502                    // Close the stream.
1503                    try {
1504                        writer.write("</stream:stream>");
1505                        writer.flush();
1506                    }
1507                    catch (Exception e) {
1508                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1509                    }
1510
1511                    // Delete the queue contents (hopefully nothing is left).
1512                    queue.clear();
1513                } else if (instantShutdown && isSmEnabled()) {
1514                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1515                    // into the unacknowledgedStanzas queue
1516                    drainWriterQueueToUnacknowledgedStanzas();
1517                }
1518                // Do *not* close the writer here, as it will cause the socket
1519                // to get closed. But we may want to receive further stanzas
1520                // until the closing stream tag is received. The socket will be
1521                // closed in shutdown().
1522            }
1523            catch (Exception e) {
1524                // The exception can be ignored if the the connection is 'done'
1525                // or if the it was caused because the socket got closed
1526                if (!(done() || queue.isShutdown())) {
1527                    writerException = e;
1528                } else {
1529                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1530                }
1531            } finally {
1532                LOGGER.fine("Reporting shutdownDone success in writer thread");
1533                shutdownDone.reportSuccess();
1534            }
1535            // Delay notifyConnectionError after shutdownDone has been reported in the finally block.
1536            if (writerException != null) {
1537                notifyConnectionError(writerException);
1538            }
1539        }
1540
1541        private void drainWriterQueueToUnacknowledgedStanzas() {
1542            List<Element> elements = new ArrayList<>(queue.size());
1543            queue.drainTo(elements);
1544            for (int i = 0; i < elements.size(); i++) {
1545                Element element = elements.get(i);
1546                // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844.
1547                if (unacknowledgedStanzas.remainingCapacity() == 0) {
1548                    StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException
1549                            .newWith(i, elements, unacknowledgedStanzas);
1550                    LOGGER.log(Level.WARNING,
1551                            "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception);
1552                    return;
1553                }
1554                if (element instanceof Stanza) {
1555                    unacknowledgedStanzas.add((Stanza) element);
1556                }
1557            }
1558        }
1559
1560        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1561            // Check if the stream element should be put to the unacknowledgedStanza
1562            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1563            // packet order is not stable at this point (sendStanzaInternal() can be
1564            // called concurrently).
1565            if (unacknowledgedStanzas != null && stanza != null) {
1566                // If the unacknowledgedStanza queue is nearly full, request an new ack
1567                // from the server in order to drain it
1568                if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1569                    writer.write(AckRequest.INSTANCE.toXML(null).toString());
1570                    writer.flush();
1571                }
1572                try {
1573                    // It is important the we put the stanza in the unacknowledged stanza
1574                    // queue before we put it on the wire
1575                    unacknowledgedStanzas.put(stanza);
1576                }
1577                catch (InterruptedException e) {
1578                    throw new IllegalStateException(e);
1579                }
1580            }
1581        }
1582    }
1583
1584    /**
1585     * Set if Stream Management should be used by default for new connections.
1586     *
1587     * @param useSmDefault true to use Stream Management for new connections.
1588     */
1589    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1590        XMPPTCPConnection.useSmDefault = useSmDefault;
1591    }
1592
1593    /**
1594     * Set if Stream Management resumption should be used by default for new connections.
1595     *
1596     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1597     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1598     */
1599    @Deprecated
1600    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1601        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1602    }
1603
1604    /**
1605     * Set if Stream Management resumption should be used by default for new connections.
1606     *
1607     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1608     */
1609    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1610        if (useSmResumptionDefault) {
1611            // Also enable SM is resumption is enabled
1612            setUseStreamManagementDefault(useSmResumptionDefault);
1613        }
1614        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1615    }
1616
1617    /**
1618     * Set if Stream Management should be used if supported by the server.
1619     *
1620     * @param useSm true to use Stream Management.
1621     */
1622    public void setUseStreamManagement(boolean useSm) {
1623        this.useSm = useSm;
1624    }
1625
1626    /**
1627     * Set if Stream Management resumption should be used if supported by the server.
1628     *
1629     * @param useSmResumption true to use Stream Management resumption.
1630     */
1631    public void setUseStreamManagementResumption(boolean useSmResumption) {
1632        if (useSmResumption) {
1633            // Also enable SM is resumption is enabled
1634            setUseStreamManagement(useSmResumption);
1635        }
1636        this.useSmResumption = useSmResumption;
1637    }
1638
1639    /**
1640     * Set the preferred resumption time in seconds.
1641     * @param resumptionTime the preferred resumption time in seconds
1642     */
1643    public void setPreferredResumptionTime(int resumptionTime) {
1644        smClientMaxResumptionTime = resumptionTime;
1645    }
1646
1647    /**
1648     * Add a predicate for Stream Management acknowledgment requests.
1649     * <p>
1650     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1651     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1652     * </p>
1653     * <p>
1654     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1655     * </p>
1656     *
1657     * @param predicate the predicate to add.
1658     * @return if the predicate was not already active.
1659     */
1660    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1661        synchronized (requestAckPredicates) {
1662            return requestAckPredicates.add(predicate);
1663        }
1664    }
1665
1666    /**
1667     * Remove the given predicate for Stream Management acknowledgment request.
1668     * @param predicate the predicate to remove.
1669     * @return true if the predicate was removed.
1670     */
1671    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1672        synchronized (requestAckPredicates) {
1673            return requestAckPredicates.remove(predicate);
1674        }
1675    }
1676
1677    /**
1678     * Remove all predicates for Stream Management acknowledgment requests.
1679     */
1680    public void removeAllRequestAckPredicates() {
1681        synchronized (requestAckPredicates) {
1682            requestAckPredicates.clear();
1683        }
1684    }
1685
1686    /**
1687     * Send an unconditional Stream Management acknowledgement request to the server.
1688     *
1689     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1690     * @throws NotConnectedException if the connection is not connected.
1691     * @throws InterruptedException
1692     */
1693    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1694        if (!isSmEnabled()) {
1695            throw new StreamManagementException.StreamManagementNotEnabledException();
1696        }
1697        requestSmAcknowledgementInternal();
1698    }
1699
1700    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1701        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1702    }
1703
1704    /**
1705     * Send a unconditional Stream Management acknowledgment to the server.
1706     * <p>
1707     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1708     * "Either party MAY send an &lt;a/&gt; element at any time (e.g., after it has received a certain number of stanzas,
1709     * or after a certain period of time), even if it has not received an &lt;r/&gt; element from the other party."
1710     * </p>
1711     *
1712     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1713     * @throws NotConnectedException if the connection is not connected.
1714     * @throws InterruptedException
1715     */
1716    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1717        if (!isSmEnabled()) {
1718            throw new StreamManagementException.StreamManagementNotEnabledException();
1719        }
1720        sendSmAcknowledgementInternal();
1721    }
1722
1723    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1724        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1725    }
1726
1727    /**
1728     * Add a Stanza acknowledged listener.
1729     * <p>
1730     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1731     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1732     * possible.
1733     * </p>
1734     *
1735     * @param listener the listener to add.
1736     */
1737    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1738        stanzaAcknowledgedListeners.add(listener);
1739    }
1740
1741    /**
1742     * Remove the given Stanza acknowledged listener.
1743     *
1744     * @param listener the listener.
1745     * @return true if the listener was removed.
1746     */
1747    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1748        return stanzaAcknowledgedListeners.remove(listener);
1749    }
1750
1751    /**
1752     * Remove all stanza acknowledged listeners.
1753     */
1754    public void removeAllStanzaAcknowledgedListeners() {
1755        stanzaAcknowledgedListeners.clear();
1756    }
1757
1758    /**
1759     * Add a new Stanza ID acknowledged listener for the given ID.
1760     * <p>
1761     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1762     * automatically be removed after the listener was run.
1763     * </p>
1764     *
1765     * @param id the stanza ID.
1766     * @param listener the listener to invoke.
1767     * @return the previous listener for this stanza ID or null.
1768     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1769     */
1770    @SuppressWarnings("FutureReturnValueIgnored")
1771    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1772        // Prevent users from adding callbacks that will never get removed
1773        if (!smWasEnabledAtLeastOnce) {
1774            throw new StreamManagementException.StreamManagementNotEnabledException();
1775        }
1776        // Remove the listener after max. 3 hours
1777        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60);
1778        schedule(new Runnable() {
1779            @Override
1780            public void run() {
1781                stanzaIdAcknowledgedListeners.remove(id);
1782            }
1783        }, removeAfterSeconds, TimeUnit.SECONDS);
1784        return stanzaIdAcknowledgedListeners.put(id, listener);
1785    }
1786
1787    /**
1788     * Remove the Stanza ID acknowledged listener for the given ID.
1789     *
1790     * @param id the stanza ID.
1791     * @return true if the listener was found and removed, false otherwise.
1792     */
1793    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1794        return stanzaIdAcknowledgedListeners.remove(id);
1795    }
1796
1797    /**
1798     * Removes all Stanza ID acknowledged listeners.
1799     */
1800    public void removeAllStanzaIdAcknowledgedListeners() {
1801        stanzaIdAcknowledgedListeners.clear();
1802    }
1803
1804    /**
1805     * Returns true if Stream Management is supported by the server.
1806     *
1807     * @return true if Stream Management is supported by the server.
1808     */
1809    public boolean isSmAvailable() {
1810        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1811    }
1812
1813    /**
1814     * Returns true if Stream Management was successfully negotiated with the server.
1815     *
1816     * @return true if Stream Management was negotiated.
1817     */
1818    public boolean isSmEnabled() {
1819        return smEnabledSyncPoint.wasSuccessful();
1820    }
1821
1822    /**
1823     * Returns true if the stream was successfully resumed with help of Stream Management.
1824     *
1825     * @return true if the stream was resumed.
1826     */
1827    public boolean streamWasResumed() {
1828        return smResumedSyncPoint.wasSuccessful();
1829    }
1830
1831    /**
1832     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1833     *
1834     * @return true if disconnected but resumption possible.
1835     */
1836    public boolean isDisconnectedButSmResumptionPossible() {
1837        return disconnectedButResumeable && isSmResumptionPossible();
1838    }
1839
1840    /**
1841     * Returns true if the stream is resumable.
1842     *
1843     * @return true if the stream is resumable.
1844     */
1845    public boolean isSmResumptionPossible() {
1846        // There is no resumable stream available
1847        if (smSessionId == null)
1848            return false;
1849
1850        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1851        // Seems like we are already reconnected, report true
1852        if (shutdownTimestamp == null) {
1853            return true;
1854        }
1855
1856        // See if resumption time is over
1857        long current = System.currentTimeMillis();
1858        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1859        if (current > shutdownTimestamp + maxResumptionMillies) {
1860            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1861            // resumption is possible
1862            return false;
1863        } else {
1864            return true;
1865        }
1866    }
1867
1868    /**
1869     * Drop the stream management state. Sets {@link #smSessionId} and
1870     * {@link #unacknowledgedStanzas} to <code>null</code>.
1871     */
1872    private void dropSmState() {
1873        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1874        // respective. No need to reset them here.
1875        smSessionId = null;
1876        unacknowledgedStanzas = null;
1877    }
1878
1879    /**
1880     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1881     * <p>
1882     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1883     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1884     * without checking for overflows before.
1885     * </p>
1886     *
1887     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1888     */
1889    public int getMaxSmResumptionTime() {
1890        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1891        int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE;
1892        return Math.min(clientResumptionTime, serverResumptionTime);
1893    }
1894
1895    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1896        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1897        final List<Stanza> ackedStanzas = new ArrayList<>(
1898                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1899                                        : Integer.MAX_VALUE);
1900        for (long i = 0; i < ackedStanzasCount; i++) {
1901            Stanza ackedStanza = unacknowledgedStanzas.poll();
1902            // If the server ack'ed a stanza, then it must be in the
1903            // unacknowledged stanza queue. There can be no exception.
1904            if (ackedStanza == null) {
1905                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1906                                ackedStanzasCount, ackedStanzas);
1907            }
1908            ackedStanzas.add(ackedStanza);
1909        }
1910
1911        boolean atLeastOneStanzaAcknowledgedListener = false;
1912        if (!stanzaAcknowledgedListeners.isEmpty()) {
1913            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1914            atLeastOneStanzaAcknowledgedListener = true;
1915        }
1916        else {
1917            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1918            for (Stanza ackedStanza : ackedStanzas) {
1919                String id = ackedStanza.getStanzaId();
1920                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1921                    atLeastOneStanzaAcknowledgedListener = true;
1922                    break;
1923                }
1924            }
1925        }
1926
1927        // Only spawn a new thread if there is a chance that some listener is invoked
1928        if (atLeastOneStanzaAcknowledgedListener) {
1929            asyncGo(new Runnable() {
1930                @Override
1931                public void run() {
1932                    for (Stanza ackedStanza : ackedStanzas) {
1933                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1934                            try {
1935                                listener.processStanza(ackedStanza);
1936                            }
1937                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1938                                LOGGER.log(Level.FINER, "Received exception", e);
1939                            }
1940                        }
1941                        String id = ackedStanza.getStanzaId();
1942                        if (StringUtils.isNullOrEmpty(id)) {
1943                            continue;
1944                        }
1945                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1946                        if (listener != null) {
1947                            try {
1948                                listener.processStanza(ackedStanza);
1949                            }
1950                            catch (InterruptedException | NotConnectedException | NotLoggedInException e) {
1951                                LOGGER.log(Level.FINER, "Received exception", e);
1952                            }
1953                        }
1954                    }
1955                }
1956            });
1957        }
1958
1959        serverHandledStanzasCount = handledCount;
1960    }
1961
1962    /**
1963     * Set the default bundle and defer callback used for new connections.
1964     *
1965     * @param defaultBundleAndDeferCallback
1966     * @see BundleAndDeferCallback
1967     * @since 4.1
1968     */
1969    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1970        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1971    }
1972
1973    /**
1974     * Set the bundle and defer callback used for this connection.
1975     * <p>
1976     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1977     * no longer get deferred.
1978     * </p>
1979     *
1980     * @param bundleAndDeferCallback the callback or <code>null</code>.
1981     * @see BundleAndDeferCallback
1982     * @since 4.1
1983     */
1984    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1985        this.bundleAndDeferCallback = bundleAndDeferCallback;
1986    }
1987
1988}