001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import org.jivesoftware.smack.AbstractConnectionListener;
020import org.jivesoftware.smack.AbstractXMPPConnection;
021import org.jivesoftware.smack.ConnectionConfiguration;
022import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
023import org.jivesoftware.smack.ConnectionCreationListener;
024import org.jivesoftware.smack.StanzaListener;
025import org.jivesoftware.smack.SmackConfiguration;
026import org.jivesoftware.smack.SmackException;
027import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
028import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
029import org.jivesoftware.smack.SmackException.NoResponseException;
030import org.jivesoftware.smack.SmackException.NotConnectedException;
031import org.jivesoftware.smack.SmackException.ConnectionException;
032import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException;
033import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
034import org.jivesoftware.smack.SmackException.SecurityRequiredException;
035import org.jivesoftware.smack.SynchronizationPoint;
036import org.jivesoftware.smack.XMPPException.StreamErrorException;
037import org.jivesoftware.smack.XMPPConnection;
038import org.jivesoftware.smack.XMPPException;
039import org.jivesoftware.smack.XMPPException.XMPPErrorException;
040import org.jivesoftware.smack.compress.packet.Compressed;
041import org.jivesoftware.smack.compression.XMPPInputOutputStream;
042import org.jivesoftware.smack.filter.StanzaFilter;
043import org.jivesoftware.smack.compress.packet.Compress;
044import org.jivesoftware.smack.packet.Element;
045import org.jivesoftware.smack.packet.IQ;
046import org.jivesoftware.smack.packet.Message;
047import org.jivesoftware.smack.packet.StreamOpen;
048import org.jivesoftware.smack.packet.Stanza;
049import org.jivesoftware.smack.packet.Presence;
050import org.jivesoftware.smack.packet.StartTls;
051import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
052import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
053import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
054import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
055import org.jivesoftware.smack.sm.SMUtils;
056import org.jivesoftware.smack.sm.StreamManagementException;
057import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
058import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
059import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
060import org.jivesoftware.smack.sm.packet.StreamManagement;
061import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
062import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
063import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
064import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
065import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
066import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
067import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
068import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
069import org.jivesoftware.smack.sm.predicates.Predicate;
070import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
071import org.jivesoftware.smack.packet.PlainStreamElement;
072import org.jivesoftware.smack.packet.XMPPError;
073import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
074import org.jivesoftware.smack.util.Async;
075import org.jivesoftware.smack.util.PacketParserUtils;
076import org.jivesoftware.smack.util.StringUtils;
077import org.jivesoftware.smack.util.TLSUtils;
078import org.jivesoftware.smack.util.dns.HostAddress;
079import org.jxmpp.util.XmppStringUtils;
080import org.xmlpull.v1.XmlPullParser;
081import org.xmlpull.v1.XmlPullParserException;
082
083import javax.net.SocketFactory;
084import javax.net.ssl.HostnameVerifier;
085import javax.net.ssl.KeyManager;
086import javax.net.ssl.KeyManagerFactory;
087import javax.net.ssl.SSLContext;
088import javax.net.ssl.SSLSocket;
089import javax.security.auth.callback.Callback;
090import javax.security.auth.callback.PasswordCallback;
091
092import java.io.BufferedReader;
093import java.io.ByteArrayInputStream;
094import java.io.FileInputStream;
095import java.io.IOException;
096import java.io.InputStream;
097import java.io.InputStreamReader;
098import java.io.OutputStream;
099import java.io.OutputStreamWriter;
100import java.io.Writer;
101import java.lang.reflect.Constructor;
102import java.net.InetAddress;
103import java.net.InetSocketAddress;
104import java.net.Socket;
105import java.net.UnknownHostException;
106import java.security.KeyManagementException;
107import java.security.KeyStore;
108import java.security.KeyStoreException;
109import java.security.NoSuchAlgorithmException;
110import java.security.NoSuchProviderException;
111import java.security.Provider;
112import java.security.Security;
113import java.security.UnrecoverableKeyException;
114import java.security.cert.CertificateException;
115import java.util.ArrayList;
116import java.util.Arrays;
117import java.util.Collection;
118import java.util.Iterator;
119import java.util.LinkedHashSet;
120import java.util.LinkedList;
121import java.util.List;
122import java.util.Map;
123import java.util.Set;
124import java.util.concurrent.ArrayBlockingQueue;
125import java.util.concurrent.BlockingQueue;
126import java.util.concurrent.ConcurrentHashMap;
127import java.util.concurrent.ConcurrentLinkedQueue;
128import java.util.concurrent.TimeUnit;
129import java.util.concurrent.atomic.AtomicBoolean;
130import java.util.logging.Level;
131import java.util.logging.Logger;
132
133/**
134 * Creates a socket connection to an XMPP server. This is the default connection
135 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
136 * 
137 * @see XMPPConnection
138 * @author Matt Tucker
139 */
140public class XMPPTCPConnection extends AbstractXMPPConnection {
141
142    private static final int QUEUE_SIZE = 500;
143    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
144
145    /**
146     * The socket which is used for this connection.
147     */
148    private Socket socket;
149
150    /**
151     * 
152     */
153    private boolean disconnectedButResumeable = false;
154
155    /**
156     * Flag to indicate if the socket was closed intentionally by Smack.
157     * <p>
158     * This boolean flag is used concurrently, therefore it is marked volatile.
159     * </p>
160     */
161    private volatile boolean socketClosed = false;
162
163    private boolean usingTLS = false;
164
165    /**
166     * Protected access level because of unit test purposes
167     */
168    protected PacketWriter packetWriter;
169
170    /**
171     * Protected access level because of unit test purposes
172     */
173    protected PacketReader packetReader;
174
175    private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<Exception>(this);
176
177    /**
178     * 
179     */
180    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
181                    this);
182
183    /**
184     * 
185     */
186    private final SynchronizationPoint<XMPPException> compressSyncPoint = new SynchronizationPoint<XMPPException>(
187                    this);
188
189    /**
190     * The default bundle and defer callback, used for new connections.
191     * @see bundleAndDeferCallback
192     */
193    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
194
195    /**
196     * The used bundle and defer callback.
197     * <p>
198     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
199     * having a 'volatile' read within the writer threads loop.
200     * </p>
201     */
202    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
203
204    private static boolean useSmDefault = false;
205
206    private static boolean useSmResumptionDefault = true;
207
208    /**
209     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
210     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
211     * {@link #unacknowledgedStanzas}.
212     */
213    private String smSessionId;
214
215    private final SynchronizationPoint<XMPPException> smResumedSyncPoint = new SynchronizationPoint<XMPPException>(
216                    this);
217
218    private final SynchronizationPoint<XMPPException> smEnabledSyncPoint = new SynchronizationPoint<XMPPException>(
219                    this);
220
221    /**
222     * The client's preferred maximum resumption time in seconds.
223     */
224    private int smClientMaxResumptionTime = -1;
225
226    /**
227     * The server's preferred maximum resumption time in seconds.
228     */
229    private int smServerMaxResumptimTime = -1;
230
231    /**
232     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
233     */
234    private boolean useSm = useSmDefault;
235    private boolean useSmResumption = useSmResumptionDefault;
236
237    /**
238     * The counter that the server sends the client about it's current height. For example, if the server sends
239     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
240     */
241    private long serverHandledStanzasCount = 0;
242
243    /**
244     * The counter for stanzas handled ("received") by the client.
245     * <p>
246     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
247     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
248     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
249     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
250     * </p>
251     */
252    private long clientHandledStanzasCount = 0;
253
254    private BlockingQueue<Stanza> unacknowledgedStanzas;
255
256    /**
257     * Set to true if Stream Management was at least once enabled for this connection.
258     */
259    private boolean smWasEnabledAtLeastOnce = false;
260
261    /**
262     * This listeners are invoked for every stanza that got acknowledged.
263     * <p>
264     * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove
265     * themselves after they have been invoked.
266     * </p>
267     */
268    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>();
269
270    /**
271     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
272     * only be invoked once and automatically removed after that.
273     */
274    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>();
275
276    /**
277     * Predicates that determine if an stream management ack should be requested from the server.
278     * <p>
279     * We use a linked hash set here, so that the order how the predicates are added matches the
280     * order in which they are invoked in order to determine if an ack request should be send or not.
281     * </p>
282     */
283    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>();
284
285    private final XMPPTCPConnectionConfiguration config;
286
287    /**
288     * Creates a new XMPP connection over TCP (optionally using proxies).
289     * <p>
290     * Note that XMPPTCPConnection constructors do not establish a connection to the server
291     * and you must call {@link #connect()}.
292     * </p>
293     *
294     * @param config the connection configuration.
295     */
296    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
297        super(config);
298        this.config = config;
299        addConnectionListener(new AbstractConnectionListener() {
300            @Override
301            public void connectionClosedOnError(Exception e) {
302                if (e instanceof XMPPException.StreamErrorException) {
303                    dropSmState();
304                }
305            }
306        });
307    }
308
309    /**
310     * Creates a new XMPP connection over TCP.
311     * <p>
312     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
313     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
314     * constructor.
315     * </p>
316     * 
317     * @param jid the bare JID used by the client.
318     * @param password the password or authentication token.
319     */
320    public XMPPTCPConnection(CharSequence jid, String password) {
321        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
322    }
323
324    /**
325     * Creates a new XMPP connection over TCP.
326     * <p>
327     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
328     * you can get fine-grained control over connection settings using the
329     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
330     * </p>
331     * @param username
332     * @param password
333     * @param serviceName
334     */
335    public XMPPTCPConnection(CharSequence username, String password, String serviceName) {
336        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setServiceName(
337                                        serviceName).build());
338    }
339
340    @Override
341    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
342        if (packetWriter == null) {
343            throw new NotConnectedException();
344        }
345        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
346    }
347
348    @Override
349    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
350        if (isConnected() && !disconnectedButResumeable) {
351            throw new AlreadyConnectedException();
352        }
353    }
354
355    @Override
356    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
357        if (isAuthenticated() && !disconnectedButResumeable) {
358            throw new AlreadyLoggedInException();
359        }
360    }
361
362    @Override
363    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException {
364        // Reset the flag in case it was set
365        disconnectedButResumeable = false;
366        super.afterSuccessfulLogin(resumed);
367    }
368
369    @Override
370    protected synchronized void loginNonAnonymously(String username, String password, String resource) throws XMPPException, SmackException, IOException {
371        if (saslAuthentication.hasNonAnonymousAuthentication()) {
372            // Authenticate using SASL
373            if (password != null) {
374                saslAuthentication.authenticate(username, password, resource);
375            }
376            else {
377                saslAuthentication.authenticate(resource, config.getCallbackHandler());
378            }
379        } else {
380            throw new SmackException("No non-anonymous SASL authentication mechanism available");
381        }
382
383        // If compression is enabled then request the server to use stream compression. XEP-170
384        // recommends to perform stream compression before resource binding.
385        if (config.isCompressionEnabled()) {
386            useCompression();
387        }
388
389        if (isSmResumptionPossible()) {
390            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
391            if (smResumedSyncPoint.wasSuccessful()) {
392                // We successfully resumed the stream, be done here
393                afterSuccessfulLogin(true);
394                return;
395            }
396            // SM resumption failed, what Smack does here is to report success of
397            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
398            // normal resource binding can be tried.
399            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
400        }
401
402        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
403        if (unacknowledgedStanzas != null) {
404            // There was a previous connection with SM enabled but that was either not resumable or
405            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
406            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
407            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
408            // XMPP session (There maybe was an enabled in a previous XMPP session of this
409            // connection instance though). This is used in writePackets to decide if stanzas should
410            // be added to the unacknowledged stanzas queue, because they have to be added right
411            // after the 'enable' stream element has been sent.
412            dropSmState();
413        }
414
415        // Now bind the resource. It is important to do this *after* we dropped an eventually
416        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
417        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
418        bindResourceAndEstablishSession(resource);
419
420        if (isSmAvailable() && useSm) {
421            // Remove what is maybe left from previously stream managed sessions
422            serverHandledStanzasCount = 0;
423            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
424            // then this is a non recoverable error and we therefore throw an exception.
425            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
426            synchronized (requestAckPredicates) {
427                if (requestAckPredicates.isEmpty()) {
428                    // Assure that we have at lest one predicate set up that so that we request acks
429                    // for the server and eventually flush some stanzas from the unacknowledged
430                    // stanza queue
431                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
432                }
433            }
434        }
435        // (Re-)send the stanzas *after* we tried to enable SM
436        for (Stanza stanza : previouslyUnackedStanzas) {
437            sendStanzaInternal(stanza);
438        }
439
440        afterSuccessfulLogin(false);
441    }
442
443    @Override
444    public synchronized void loginAnonymously() throws XMPPException, SmackException, IOException {
445        // Wait with SASL auth until the SASL mechanisms have been received
446        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
447
448        if (saslAuthentication.hasAnonymousAuthentication()) {
449            saslAuthentication.authenticateAnonymously();
450        }
451        else {
452            throw new SmackException("No anonymous SASL authentication mechanism available");
453        }
454
455        // If compression is enabled then request the server to use stream compression
456        if (config.isCompressionEnabled()) {
457            useCompression();
458        }
459
460        bindResourceAndEstablishSession(null);
461
462        afterSuccessfulLogin(false);
463    }
464
465    @Override
466    public boolean isSecureConnection() {
467        return usingTLS;
468    }
469
470    public boolean isSocketClosed() {
471        return socketClosed;
472    }
473
474    /**
475     * Shuts the current connection down. After this method returns, the connection must be ready
476     * for re-use by connect.
477     */
478    @Override
479    protected void shutdown() {
480        if (isSmEnabled()) {
481            try {
482                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
483                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
484                sendSmAcknowledgementInternal();
485            } catch (NotConnectedException e) {
486                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
487            }
488        }
489        shutdown(false);
490    }
491
492    /**
493     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
494     */
495    public synchronized void instantShutdown() {
496        shutdown(true);
497    }
498
499    private void shutdown(boolean instant) {
500        if (disconnectedButResumeable) {
501            return;
502        }
503        if (packetReader != null) {
504                packetReader.shutdown();
505        }
506        if (packetWriter != null) {
507                packetWriter.shutdown(instant);
508        }
509
510        // Set socketClosed to true. This will cause the PacketReader
511        // and PacketWriter to ignore any Exceptions that are thrown
512        // because of a read/write from/to a closed stream.
513        // It is *important* that this is done before socket.close()!
514        socketClosed = true;
515        try {
516                socket.close();
517        } catch (Exception e) {
518                LOGGER.log(Level.WARNING, "shutdown", e);
519        }
520
521        setWasAuthenticated();
522        // If we are able to resume the stream, then don't set
523        // connected/authenticated/usingTLS to false since we like behave like we are still
524        // connected (e.g. sendStanza should not throw a NotConnectedException).
525        if (isSmResumptionPossible() && instant) {
526            disconnectedButResumeable = true;
527        } else {
528            disconnectedButResumeable = false;
529            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
530            // stream tag, there is no longer a stream to resume.
531            smSessionId = null;
532        }
533        authenticated = false;
534        connected = false;
535        usingTLS = false;
536        reader = null;
537        writer = null;
538
539        maybeCompressFeaturesReceived.init();
540        compressSyncPoint.init();
541        smResumedSyncPoint.init();
542        smEnabledSyncPoint.init();
543        initalOpenStreamSend.init();
544    }
545
546    @Override
547    public void send(PlainStreamElement element) throws NotConnectedException {
548        packetWriter.sendStreamElement(element);
549    }
550
551    @Override
552    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException {
553        packetWriter.sendStreamElement(packet);
554        if (isSmEnabled()) {
555            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
556                if (requestAckPredicate.accept(packet)) {
557                    requestSmAcknowledgementInternal();
558                    break;
559                }
560            }
561        }
562    }
563
564    private void connectUsingConfiguration() throws IOException, ConnectionException {
565        List<HostAddress> failedAddresses = populateHostAddresses();
566        SocketFactory socketFactory = config.getSocketFactory();
567        if (socketFactory == null) {
568            socketFactory = SocketFactory.getDefault();
569        }
570        for (HostAddress hostAddress : hostAddresses) {
571            String host = hostAddress.getFQDN();
572            int port = hostAddress.getPort();
573            socket = socketFactory.createSocket();
574            try {
575                Iterator<InetAddress> inetAddresses = Arrays.asList(InetAddress.getAllByName(host)).iterator();
576                if (!inetAddresses.hasNext()) {
577                    // This should not happen
578                    LOGGER.warning("InetAddress.getAllByName() returned empty result array.");
579                    throw new UnknownHostException(host);
580                }
581                innerloop: while (inetAddresses.hasNext()) {
582                    final InetAddress inetAddress = inetAddresses.next();
583                    final String inetAddressAndPort = inetAddress + " at port " + port;
584                    LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
585                    try {
586                        socket.connect(new InetSocketAddress(inetAddress, port), config.getConnectTimeout());
587                    } catch (Exception e) {
588                        if (inetAddresses.hasNext()) {
589                            continue innerloop;
590                        } else {
591                            throw e;
592                        }
593                    }
594                    LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
595                    // We found a host to connect to, return here
596                    this.host = host;
597                    this.port = port;
598                    return;
599                }
600            }
601            catch (Exception e) {
602                hostAddress.setException(e);
603                failedAddresses.add(hostAddress);
604            }
605        }
606        // There are no more host addresses to try
607        // throw an exception and report all tried
608        // HostAddresses in the exception
609        throw ConnectionException.from(failedAddresses);
610    }
611
612    /**
613     * Initializes the connection by creating a stanza(/packet) reader and writer and opening a
614     * XMPP stream to the server.
615     *
616     * @throws XMPPException if establishing a connection to the server fails.
617     * @throws SmackException if the server failes to respond back or if there is anther error.
618     * @throws IOException 
619     */
620    private void initConnection() throws IOException {
621        boolean isFirstInitialization = packetReader == null || packetWriter == null;
622        compressionHandler = null;
623
624        // Set the reader and writer instance variables
625        initReaderAndWriter();
626
627        if (isFirstInitialization) {
628            packetWriter = new PacketWriter();
629            packetReader = new PacketReader();
630
631            // If debugging is enabled, we should start the thread that will listen for
632            // all packets and then log them.
633            if (config.isDebuggerEnabled()) {
634                addAsyncStanzaListener(debugger.getReaderListener(), null);
635                if (debugger.getWriterListener() != null) {
636                    addPacketSendingListener(debugger.getWriterListener(), null);
637                }
638            }
639        }
640        // Start the packet writer. This will open an XMPP stream to the server
641        packetWriter.init();
642        // Start the packet reader. The startup() method will block until we
643        // get an opening stream packet back from server
644        packetReader.init();
645
646        if (isFirstInitialization) {
647            // Notify listeners that a new connection has been established
648            for (ConnectionCreationListener listener : getConnectionCreationListeners()) {
649                listener.connectionCreated(this);
650            }
651        }
652    }
653
654    private void initReaderAndWriter() throws IOException {
655        InputStream is = socket.getInputStream();
656        OutputStream os = socket.getOutputStream();
657        if (compressionHandler != null) {
658            is = compressionHandler.getInputStream(is);
659            os = compressionHandler.getOutputStream(os);
660        }
661        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
662        writer = new OutputStreamWriter(os, "UTF-8");
663        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
664
665        // If debugging is enabled, we open a window and write out all network traffic.
666        initDebugger();
667    }
668
669    /**
670     * The server has indicated that TLS negotiation can start. We now need to secure the
671     * existing plain connection and perform a handshake. This method won't return until the
672     * connection has finished the handshake or an error occurred while securing the connection.
673     * @throws IOException 
674     * @throws CertificateException 
675     * @throws NoSuchAlgorithmException 
676     * @throws NoSuchProviderException 
677     * @throws KeyStoreException 
678     * @throws UnrecoverableKeyException 
679     * @throws KeyManagementException 
680     * @throws SmackException 
681     * @throws Exception if an exception occurs.
682     */
683    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
684        SSLContext context = this.config.getCustomSSLContext();
685        KeyStore ks = null;
686        KeyManager[] kms = null;
687        PasswordCallback pcb = null;
688
689        if(config.getCallbackHandler() == null) {
690           ks = null;
691        } else if (context == null) {
692            if(config.getKeystoreType().equals("NONE")) {
693                ks = null;
694                pcb = null;
695            }
696            else if(config.getKeystoreType().equals("PKCS11")) {
697                try {
698                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
699                    String pkcs11Config = "name = SmartCard\nlibrary = "+config.getPKCS11Library();
700                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes());
701                    Provider p = (Provider)c.newInstance(config);
702                    Security.addProvider(p);
703                    ks = KeyStore.getInstance("PKCS11",p);
704                    pcb = new PasswordCallback("PKCS11 Password: ",false);
705                    this.config.getCallbackHandler().handle(new Callback[]{pcb});
706                    ks.load(null,pcb.getPassword());
707                }
708                catch (Exception e) {
709                    ks = null;
710                    pcb = null;
711                }
712            }
713            else if(config.getKeystoreType().equals("Apple")) {
714                ks = KeyStore.getInstance("KeychainStore","Apple");
715                ks.load(null,null);
716                //pcb = new PasswordCallback("Apple Keychain",false);
717                //pcb.setPassword(null);
718            }
719            else {
720                ks = KeyStore.getInstance(config.getKeystoreType());
721                try {
722                    pcb = new PasswordCallback("Keystore Password: ",false);
723                    config.getCallbackHandler().handle(new Callback[]{pcb});
724                    ks.load(new FileInputStream(config.getKeystorePath()), pcb.getPassword());
725                }
726                catch(Exception e) {
727                    ks = null;
728                    pcb = null;
729                }
730            }
731            KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
732            try {
733                if(pcb == null) {
734                    kmf.init(ks,null);
735                } else {
736                    kmf.init(ks,pcb.getPassword());
737                    pcb.clearPassword();
738                }
739                kms = kmf.getKeyManagers();
740            } catch (NullPointerException npe) {
741                kms = null;
742            }
743        }
744
745        // If the user didn't specify a SSLContext, use the default one
746        if (context == null) {
747            context = SSLContext.getInstance("TLS");
748            context.init(kms, null, new java.security.SecureRandom());
749        }
750        Socket plain = socket;
751        // Secure the plain connection
752        socket = context.getSocketFactory().createSocket(plain,
753                host, plain.getPort(), true);
754
755        final SSLSocket sslSocket = (SSLSocket) socket;
756        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
757        // important (at least on certain platforms) and it seems to be a good idea anyways to
758        // prevent an accidental implicit handshake.
759        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
760
761        // Initialize the reader and writer with the new secured version
762        initReaderAndWriter();
763
764        // Proceed to do the handshake
765        sslSocket.startHandshake();
766
767        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
768        if (verifier == null) {
769                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
770        } else if (!verifier.verify(getServiceName(), sslSocket.getSession())) {
771            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getServiceName());
772        }
773
774        // Set that TLS was successful
775        usingTLS = true;
776    }
777
778    /**
779     * Returns the compression handler that can be used for one compression methods offered by the server.
780     * 
781     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
782     * 
783     */
784    private XMPPInputOutputStream maybeGetCompressionHandler() {
785        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
786        if (compression == null) {
787            // Server does not support compression
788            return null;
789        }
790        for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) {
791                String method = handler.getCompressionMethod();
792                if (compression.getMethods().contains(method))
793                    return handler;
794        }
795        return null;
796    }
797
798    @Override
799    public boolean isUsingCompression() {
800        return compressionHandler != null && compressSyncPoint.wasSuccessful();
801    }
802
803    /**
804     * <p>
805     * Starts using stream compression that will compress network traffic. Traffic can be
806     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
807     * connection. However, the server and the client will need to use more CPU time in order to
808     * un/compress network data so under high load the server performance might be affected.
809     * </p>
810     * <p>
811     * Stream compression has to have been previously offered by the server. Currently only the
812     * zlib method is supported by the client. Stream compression negotiation has to be done
813     * before authentication took place.
814     * </p>
815     *
816     * @throws NotConnectedException 
817     * @throws XMPPException 
818     * @throws NoResponseException 
819     */
820    private void useCompression() throws NotConnectedException, NoResponseException, XMPPException {
821        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
822        // If stream compression was offered by the server and we want to use
823        // compression then send compression request to the server
824        if ((compressionHandler = maybeGetCompressionHandler()) != null) {
825            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
826        } else {
827            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
828        }
829    }
830
831    /**
832     * Establishes a connection to the XMPP server and performs an automatic login
833     * only if the previous connection state was logged (authenticated). It basically
834     * creates and maintains a socket connection to the server.<p>
835     * <p/>
836     * Listeners will be preserved from a previous connection if the reconnection
837     * occurs after an abrupt termination.
838     *
839     * @throws XMPPException if an error occurs while trying to establish the connection.
840     * @throws SmackException 
841     * @throws IOException 
842     */
843    @Override
844    protected void connectInternal() throws SmackException, IOException, XMPPException {
845        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
846        // there is an error establishing the connection
847        connectUsingConfiguration();
848
849        // We connected successfully to the servers TCP port
850        socketClosed = false;
851        initConnection();
852
853        // Wait with SASL auth until the SASL mechanisms have been received
854        saslFeatureReceived.checkIfSuccessOrWaitOrThrow();
855
856        // Make note of the fact that we're now connected.
857        connected = true;
858        callConnectionConnectedListener();
859
860        // Automatically makes the login if the user was previously connected successfully
861        // to the server and the connection was terminated abruptly
862        if (wasAuthenticated) {
863            login();
864            notifyReconnection();
865        }
866    }
867
868    /**
869     * Sends out a notification that there was an error with the connection
870     * and closes the connection. Also prints the stack trace of the given exception
871     *
872     * @param e the exception that causes the connection close event.
873     */
874    private synchronized void notifyConnectionError(Exception e) {
875        // Listeners were already notified of the exception, return right here.
876        if ((packetReader == null || packetReader.done) &&
877                (packetWriter == null || packetWriter.done())) return;
878
879        // Closes the connection temporary. A reconnection is possible
880        instantShutdown();
881
882        // Notify connection listeners of the error.
883        callConnectionClosedOnErrorListener(e);
884    }
885
886    /**
887     * For unit testing purposes
888     *
889     * @param writer
890     */
891    protected void setWriter(Writer writer) {
892        this.writer = writer;
893    }
894
895    @Override
896    protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException {
897        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
898        if (startTlsFeature != null) {
899            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
900                notifyConnectionError(new SecurityRequiredByServerException());
901                return;
902            }
903
904            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
905                send(new StartTls());
906            }
907        }
908        // If TLS is required but the server doesn't offer it, disconnect
909        // from the server and throw an error. First check if we've already negotiated TLS
910        // and are secure, however (features get parsed a second time after TLS is established).
911        if (!isSecureConnection() && startTlsFeature == null
912                        && getConfiguration().getSecurityMode() == SecurityMode.required) {
913            throw new SecurityRequiredByClientException();
914        }
915
916        if (getSASLAuthentication().authenticationSuccessful()) {
917            // If we have received features after the SASL has been successfully completed, then we
918            // have also *maybe* received, as it is an optional feature, the compression feature
919            // from the server.
920            maybeCompressFeaturesReceived.reportSuccess();
921        }
922    }
923
924    /**
925     * Resets the parser using the latest connection's reader. Reseting the parser is necessary
926     * when the plain connection has been secured or when a new opening stream element is going
927     * to be sent by the server.
928     *
929     * @throws SmackException if the parser could not be reset.
930     */
931    void openStream() throws SmackException {
932        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
933        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
934        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
935        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
936        CharSequence to = getServiceName();
937        CharSequence from = null;
938        CharSequence localpart = config.getUsername();
939        if (localpart != null) {
940            from = XmppStringUtils.completeJidFrom(localpart, to);
941        }
942        String id = getStreamId();
943        send(new StreamOpen(to, from, id));
944        try {
945            packetReader.parser = PacketParserUtils.newXmppParser(reader);
946        }
947        catch (XmlPullParserException e) {
948            throw new SmackException(e);
949        }
950    }
951
952    protected class PacketReader {
953
954        XmlPullParser parser;
955
956        private volatile boolean done;
957
958        /**
959         * Initializes the reader in order to be used. The reader is initialized during the
960         * first connection and when reconnecting due to an abruptly disconnection.
961         */
962        void init() {
963            done = false;
964
965            Async.go(new Runnable() {
966                public void run() {
967                    parsePackets();
968                }
969            }, "Smack Packet Reader (" + getConnectionCounter() + ")");
970         }
971
972        /**
973         * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true.
974         */
975        void shutdown() {
976            done = true;
977        }
978
979        /**
980         * Parse top-level packets in order to process them further.
981         *
982         * @param thread the thread that is being used by the reader to parse incoming packets.
983         */
984        private void parsePackets() {
985            try {
986                initalOpenStreamSend.checkIfSuccessOrWait();
987                int eventType = parser.getEventType();
988                while (!done) {
989                    switch (eventType) {
990                    case XmlPullParser.START_TAG:
991                        final String name = parser.getName();
992                        switch (name) {
993                        case Message.ELEMENT:
994                        case IQ.IQ_ELEMENT:
995                        case Presence.ELEMENT:
996                            try {
997                                parseAndProcessStanza(parser);
998                            } finally {
999                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1000                            }
1001                            break;
1002                        case "stream":
1003                            // We found an opening stream.
1004                            if ("jabber:client".equals(parser.getNamespace(null))) {
1005                                streamId = parser.getAttributeValue("", "id");
1006                                String reportedServiceName = parser.getAttributeValue("", "from");
1007                                assert(reportedServiceName.equals(config.getServiceName()));
1008                            }
1009                            break;
1010                        case "error":
1011                            throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
1012                        case "features":
1013                            parseFeatures(parser);
1014                            break;
1015                        case "proceed":
1016                            try {
1017                                // Secure the connection by negotiating TLS
1018                                proceedTLSReceived();
1019                                // Send a new opening stream to the server
1020                                openStream();
1021                            }
1022                            catch (Exception e) {
1023                                // We report any failure regarding TLS in the second stage of XMPP
1024                                // connection establishment, namely the SASL authentication
1025                                saslFeatureReceived.reportFailure(new SmackException(e));
1026                                throw e;
1027                            }
1028                            break;
1029                        case "failure":
1030                            String namespace = parser.getNamespace(null);
1031                            switch (namespace) {
1032                            case "urn:ietf:params:xml:ns:xmpp-tls":
1033                                // TLS negotiation has failed. The server will close the connection
1034                                // TODO Parse failure stanza
1035                                throw new XMPPErrorException("TLS negotiation has failed", null);
1036                            case "http://jabber.org/protocol/compress":
1037                                // Stream compression has been denied. This is a recoverable
1038                                // situation. It is still possible to authenticate and
1039                                // use the connection but using an uncompressed connection
1040                                // TODO Parse failure stanza
1041                                compressSyncPoint.reportFailure(new XMPPErrorException(
1042                                                "Could not establish compression", null));
1043                                break;
1044                            case SaslStreamElements.NAMESPACE:
1045                                // SASL authentication has failed. The server may close the connection
1046                                // depending on the number of retries
1047                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1048                                getSASLAuthentication().authenticationFailed(failure);
1049                                break;
1050                            }
1051                            break;
1052                        case Challenge.ELEMENT:
1053                            // The server is challenging the SASL authentication made by the client
1054                            String challengeData = parser.nextText();
1055                            getSASLAuthentication().challengeReceived(challengeData);
1056                            break;
1057                        case Success.ELEMENT:
1058                            Success success = new Success(parser.nextText());
1059                            // We now need to bind a resource for the connection
1060                            // Open a new stream and wait for the response
1061                            openStream();
1062                            // The SASL authentication with the server was successful. The next step
1063                            // will be to bind the resource
1064                            getSASLAuthentication().authenticated(success);
1065                            break;
1066                        case Compressed.ELEMENT:
1067                            // Server confirmed that it's possible to use stream compression. Start
1068                            // stream compression
1069                            // Initialize the reader and writer with the new compressed version
1070                            initReaderAndWriter();
1071                            // Send a new opening stream to the server
1072                            openStream();
1073                            // Notify that compression is being used
1074                            compressSyncPoint.reportSuccess();
1075                            break;
1076                        case Enabled.ELEMENT:
1077                            Enabled enabled = ParseStreamManagement.enabled(parser);
1078                            if (enabled.isResumeSet()) {
1079                                smSessionId = enabled.getId();
1080                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1081                                    XMPPErrorException xmppException = new XMPPErrorException(
1082                                                    "Stream Management 'enabled' element with resume attribute but without session id received",
1083                                                    new XMPPError(
1084                                                                    XMPPError.Condition.bad_request));
1085                                    smEnabledSyncPoint.reportFailure(xmppException);
1086                                    throw xmppException;
1087                                }
1088                                smServerMaxResumptimTime = enabled.getMaxResumptionTime();
1089                            } else {
1090                                // Mark this a non-resumable stream by setting smSessionId to null
1091                                smSessionId = null;
1092                            }
1093                            clientHandledStanzasCount = 0;
1094                            smWasEnabledAtLeastOnce = true;
1095                            smEnabledSyncPoint.reportSuccess();
1096                            LOGGER.fine("Stream Management (XEP-198): succesfully enabled");
1097                            break;
1098                        case Failed.ELEMENT:
1099                            Failed failed = ParseStreamManagement.failed(parser);
1100                            XMPPError xmppError = new XMPPError(failed.getXMPPErrorCondition());
1101                            XMPPException xmppException = new XMPPErrorException("Stream Management failed", xmppError);
1102                            // If only XEP-198 would specify different failure elements for the SM
1103                            // enable and SM resume failure case. But this is not the case, so we
1104                            // need to determine if this is a 'Failed' response for either 'Enable'
1105                            // or 'Resume'.
1106                            if (smResumedSyncPoint.requestSent()) {
1107                                smResumedSyncPoint.reportFailure(xmppException);
1108                            }
1109                            else {
1110                                if (!smEnabledSyncPoint.requestSent()) {
1111                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1112                                }
1113                                smEnabledSyncPoint.reportFailure(xmppException);
1114                                // Report success for last lastFeaturesReceived so that in case a
1115                                // failed resumption, we can continue with normal resource binding.
1116                                // See text of XEP-198 5. below Example 11.
1117                                lastFeaturesReceived.reportSuccess();
1118                            }
1119                            break;
1120                        case Resumed.ELEMENT:
1121                            Resumed resumed = ParseStreamManagement.resumed(parser);
1122                            if (!smSessionId.equals(resumed.getPrevId())) {
1123                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1124                            }
1125                            // Mark SM as enabled and resumption as successful.
1126                            smResumedSyncPoint.reportSuccess();
1127                            smEnabledSyncPoint.reportSuccess();
1128                            // First, drop the stanzas already handled by the server
1129                            processHandledCount(resumed.getHandledCount());
1130                            // Then re-send what is left in the unacknowledged queue
1131                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1132                            unacknowledgedStanzas.drainTo(stanzasToResend);
1133                            for (Stanza stanza : stanzasToResend) {
1134                                sendStanzaInternal(stanza);
1135                            }
1136                            // If there where stanzas resent, then request a SM ack for them.
1137                            // Writer's sendStreamElement() won't do it automatically based on
1138                            // predicates.
1139                            if (!stanzasToResend.isEmpty()) {
1140                                requestSmAcknowledgementInternal();
1141                            }
1142                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1143                            break;
1144                        case AckAnswer.ELEMENT:
1145                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1146                            processHandledCount(ackAnswer.getHandledCount());
1147                            break;
1148                        case AckRequest.ELEMENT:
1149                            ParseStreamManagement.ackRequest(parser);
1150                            if (smEnabledSyncPoint.wasSuccessful()) {
1151                                sendSmAcknowledgementInternal();
1152                            } else {
1153                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1154                            }
1155                            break;
1156                         default:
1157                             LOGGER.warning("Unknown top level stream element: " + name);
1158                             break;
1159                        }
1160                        break;
1161                    case XmlPullParser.END_TAG:
1162                        if (parser.getName().equals("stream")) {
1163                            // Disconnect the connection
1164                            disconnect();
1165                        }
1166                        break;
1167                    case XmlPullParser.END_DOCUMENT:
1168                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1169                        // closing stream element before.
1170                        throw new SmackException(
1171                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1172                    }
1173                    eventType = parser.next();
1174                }
1175            }
1176            catch (Exception e) {
1177                // The exception can be ignored if the the connection is 'done'
1178                // or if the it was caused because the socket got closed
1179                if (!(done || isSocketClosed())) {
1180                    // Close the connection and notify connection listeners of the
1181                    // error.
1182                    notifyConnectionError(e);
1183                }
1184            }
1185        }
1186    }
1187
1188    protected class PacketWriter {
1189        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1190
1191        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>(
1192                        QUEUE_SIZE, true);
1193
1194        /**
1195         * Needs to be protected for unit testing purposes.
1196         */
1197        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>(
1198                        XMPPTCPConnection.this);
1199
1200        /**
1201         * If set, the stanza(/packet) writer is shut down
1202         */
1203        protected volatile Long shutdownTimestamp = null;
1204
1205        private volatile boolean instantShutdown;
1206
1207        /**
1208         * True if some preconditions are given to start the bundle and defer mechanism.
1209         * <p>
1210         * This will likely get set to true right after the start of the writer thread, because
1211         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1212         * this field to true.
1213         * </p>
1214         */
1215        private boolean shouldBundleAndDefer;
1216
1217        /** 
1218        * Initializes the writer in order to be used. It is called at the first connection and also 
1219        * is invoked if the connection is disconnected by an error.
1220        */ 
1221        void init() {
1222            shutdownDone.init();
1223            shutdownTimestamp = null;
1224
1225            if (unacknowledgedStanzas != null) {
1226                // It's possible that there are new stanzas in the writer queue that
1227                // came in while we were disconnected but resumable, drain those into
1228                // the unacknowledged queue so that they get resent now
1229                drainWriterQueueToUnacknowledgedStanzas();
1230            }
1231
1232            queue.start();
1233            Async.go(new Runnable() {
1234                @Override
1235                public void run() {
1236                    writePackets();
1237                }
1238            }, "Smack Packet Writer (" + getConnectionCounter() + ")");
1239        }
1240
1241        private boolean done() {
1242            return shutdownTimestamp != null;
1243        }
1244
1245        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1246            if (done() && !isSmResumptionPossible()) {
1247                // Don't throw a NotConnectedException is there is an resumable stream available
1248                throw new NotConnectedException();
1249            }
1250        }
1251
1252        /**
1253         * Sends the specified element to the server.
1254         *
1255         * @param element the element to send.
1256         * @throws NotConnectedException 
1257         */
1258        protected void sendStreamElement(Element element) throws NotConnectedException {
1259            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1260
1261            boolean enqueued = false;
1262            while (!enqueued) {
1263                try {
1264                    queue.put(element);
1265                    enqueued = true;
1266                }
1267                catch (InterruptedException e) {
1268                    throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1269                    // If the method above did not throw, then the sending thread was interrupted
1270                    // TODO in a later version of Smack the InterruptedException should be thrown to
1271                    // allow users to interrupt a sending thread that is currently blocking because
1272                    // the queue is full.
1273                    LOGGER.log(Level.WARNING, "Sending thread was interrupted", e);
1274                }
1275            }
1276        }
1277
1278        /**
1279         * Shuts down the stanza(/packet) writer. Once this method has been called, no further
1280         * packets will be written to the server.
1281         */
1282        void shutdown(boolean instant) {
1283            instantShutdown = instant;
1284            shutdownTimestamp = System.currentTimeMillis();
1285            queue.shutdown();
1286            try {
1287                shutdownDone.checkIfSuccessOrWait();
1288            }
1289            catch (NoResponseException e) {
1290                LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1291            }
1292        }
1293
1294        /**
1295         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1296         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1297         * that case.
1298         *
1299         * @return the next element for writing or null.
1300         */
1301        private Element nextStreamElement() {
1302            // It is important the we check if the queue is empty before removing an element from it
1303            if (queue.isEmpty()) {
1304                shouldBundleAndDefer = true;
1305            }
1306            Element packet = null;
1307            try {
1308                packet = queue.take();
1309            }
1310            catch (InterruptedException e) {
1311                if (!queue.isShutdown()) {
1312                    // Users shouldn't try to interrupt the packet writer thread
1313                    LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1314                }
1315            }
1316            return packet;
1317        }
1318
1319        private void writePackets() {
1320            try {
1321                openStream();
1322                initalOpenStreamSend.reportSuccess();
1323                // Write out packets from the queue.
1324                while (!done()) {
1325                    Element element = nextStreamElement();
1326                    if (element == null) {
1327                        continue;
1328                    }
1329
1330                    // Get a local version of the bundle and defer callback, in case it's unset
1331                    // between the null check and the method invocation
1332                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1333                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1334                    // empty), then we could wait a bit for further stanzas attempting to decrease
1335                    // our energy consumption
1336                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1337                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1338                        // queue is empty again.
1339                        shouldBundleAndDefer = false;
1340                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1341                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1342                                        bundlingAndDeferringStopped));
1343                        if (bundleAndDeferMillis > 0) {
1344                            long remainingWait = bundleAndDeferMillis;
1345                            final long waitStart = System.currentTimeMillis();
1346                            synchronized (bundlingAndDeferringStopped) {
1347                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1348                                    bundlingAndDeferringStopped.wait(remainingWait);
1349                                    remainingWait = bundleAndDeferMillis
1350                                                    - (System.currentTimeMillis() - waitStart);
1351                                }
1352                            }
1353                        }
1354                    }
1355
1356                    Stanza packet = null;
1357                    if (element instanceof Stanza) {
1358                        packet = (Stanza) element;
1359                    }
1360                    else if (element instanceof Enable) {
1361                        // The client needs to add messages to the unacknowledged stanzas queue
1362                        // right after it sent 'enabled'. Stanza will be added once
1363                        // unacknowledgedStanzas is not null.
1364                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1365                    }
1366                    // Check if the stream element should be put to the unacknowledgedStanza
1367                    // queue. Note that we can not do the put() in sendStanzaInternal() and the
1368                    // packet order is not stable at this point (sendStanzaInternal() can be
1369                    // called concurrently).
1370                    if (unacknowledgedStanzas != null && packet != null) {
1371                        // If the unacknowledgedStanza queue is nearly full, request an new ack
1372                        // from the server in order to drain it
1373                        if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1374                            writer.write(AckRequest.INSTANCE.toXML().toString());
1375                            writer.flush();
1376                        }
1377                        try {
1378                            // It is important the we put the stanza in the unacknowledged stanza
1379                            // queue before we put it on the wire
1380                            unacknowledgedStanzas.put(packet);
1381                        }
1382                        catch (InterruptedException e) {
1383                            throw new IllegalStateException(e);
1384                        }
1385                    }
1386                    writer.write(element.toXML().toString());
1387                    if (queue.isEmpty()) {
1388                        writer.flush();
1389                    }
1390                    if (packet != null) {
1391                        firePacketSendingListeners(packet);
1392                    }
1393                }
1394                if (!instantShutdown) {
1395                    // Flush out the rest of the queue.
1396                    try {
1397                        while (!queue.isEmpty()) {
1398                            Element packet = queue.remove();
1399                            writer.write(packet.toXML().toString());
1400                        }
1401                        writer.flush();
1402                    }
1403                    catch (Exception e) {
1404                        LOGGER.log(Level.WARNING,
1405                                        "Exception flushing queue during shutdown, ignore and continue",
1406                                        e);
1407                    }
1408
1409                    // Close the stream.
1410                    try {
1411                        writer.write("</stream:stream>");
1412                        writer.flush();
1413                    }
1414                    catch (Exception e) {
1415                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1416                    }
1417                    // Delete the queue contents (hopefully nothing is left).
1418                    queue.clear();
1419                } else if (instantShutdown && isSmEnabled()) {
1420                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1421                    // into the unacknowledgedStanzas queue
1422                    drainWriterQueueToUnacknowledgedStanzas();
1423                }
1424
1425                try {
1426                    writer.close();
1427                }
1428                catch (Exception e) {
1429                    // Do nothing
1430                }
1431
1432            }
1433            catch (Exception e) {
1434                // The exception can be ignored if the the connection is 'done'
1435                // or if the it was caused because the socket got closed
1436                if (!(done() || isSocketClosed())) {
1437                    notifyConnectionError(e);
1438                } else {
1439                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1440                }
1441            } finally {
1442                LOGGER.fine("Reporting shutdownDone success in writer thread");
1443                shutdownDone.reportSuccess();
1444            }
1445        }
1446
1447        private void drainWriterQueueToUnacknowledgedStanzas() {
1448            List<Element> elements = new ArrayList<Element>(queue.size());
1449            queue.drainTo(elements);
1450            for (Element element : elements) {
1451                if (element instanceof Stanza) {
1452                    unacknowledgedStanzas.add((Stanza) element);
1453                }
1454            }
1455        }
1456    }
1457
1458    /**
1459     * Set if Stream Management should be used by default for new connections.
1460     * 
1461     * @param useSmDefault true to use Stream Management for new connections.
1462     */
1463    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1464        XMPPTCPConnection.useSmDefault = useSmDefault;
1465    }
1466
1467    /**
1468     * Set if Stream Management resumption should be used by default for new connections.
1469     * 
1470     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1471     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1472     */
1473    @Deprecated
1474    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1475        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1476    }
1477
1478    /**
1479     * Set if Stream Management resumption should be used by default for new connections.
1480     *
1481     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1482     */
1483    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1484        if (useSmResumptionDefault) {
1485            // Also enable SM is resumption is enabled
1486            setUseStreamManagementDefault(useSmResumptionDefault);
1487        }
1488        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1489    }
1490
1491    /**
1492     * Set if Stream Management should be used if supported by the server.
1493     * 
1494     * @param useSm true to use Stream Management.
1495     */
1496    public void setUseStreamManagement(boolean useSm) {
1497        this.useSm = useSm;
1498    }
1499
1500    /**
1501     * Set if Stream Management resumption should be used if supported by the server.
1502     *
1503     * @param useSmResumption true to use Stream Management resumption.
1504     */
1505    public void setUseStreamManagementResumption(boolean useSmResumption) {
1506        if (useSmResumption) {
1507            // Also enable SM is resumption is enabled
1508            setUseStreamManagement(useSmResumption);
1509        }
1510        this.useSmResumption = useSmResumption;
1511    }
1512
1513    /**
1514     * Set the preferred resumption time in seconds.
1515     * @param resumptionTime the preferred resumption time in seconds
1516     */
1517    public void setPreferredResumptionTime(int resumptionTime) {
1518        smClientMaxResumptionTime = resumptionTime;
1519    }
1520
1521    /**
1522     * Add a predicate for Stream Management acknowledgment requests.
1523     * <p>
1524     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1525     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1526     * </p>
1527     * <p>
1528     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1529     * </p>
1530     * 
1531     * @param predicate the predicate to add.
1532     * @return if the predicate was not already active.
1533     */
1534    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1535        synchronized (requestAckPredicates) {
1536            return requestAckPredicates.add(predicate);
1537        }
1538    }
1539
1540    /**
1541     * Remove the given predicate for Stream Management acknowledgment request.
1542     * @param predicate the predicate to remove.
1543     * @return true if the predicate was removed.
1544     */
1545    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1546        synchronized (requestAckPredicates) {
1547            return requestAckPredicates.remove(predicate);
1548        }
1549    }
1550
1551    /**
1552     * Remove all predicates for Stream Management acknowledgment requests.
1553     */
1554    public void removeAllRequestAckPredicates() {
1555        synchronized (requestAckPredicates) {
1556            requestAckPredicates.clear();
1557        }
1558    }
1559
1560    /**
1561     * Send an unconditional Stream Management acknowledgement request to the server.
1562     *
1563     * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
1564     * @throws NotConnectedException if the connection is not connected.
1565     */
1566    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
1567        if (!isSmEnabled()) {
1568            throw new StreamManagementException.StreamManagementNotEnabledException();
1569        }
1570        requestSmAcknowledgementInternal();
1571    }
1572
1573    private void requestSmAcknowledgementInternal() throws NotConnectedException {
1574        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1575    }
1576
1577    /**
1578     * Send a unconditional Stream Management acknowledgment to the server.
1579     * <p>
1580     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1581     * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
1582     * or after a certain period of time), even if it has not received an <r/> element from the other party."
1583     * </p>
1584     * 
1585     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1586     * @throws NotConnectedException if the connection is not connected.
1587     */
1588    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException {
1589        if (!isSmEnabled()) {
1590            throw new StreamManagementException.StreamManagementNotEnabledException();
1591        }
1592        sendSmAcknowledgementInternal();
1593    }
1594
1595    private void sendSmAcknowledgementInternal() throws NotConnectedException {
1596        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1597    }
1598
1599    /**
1600     * Add a Stanza acknowledged listener.
1601     * <p>
1602     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1603     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1604     * possible.
1605     * </p>
1606     * 
1607     * @param listener the listener to add.
1608     */
1609    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1610        stanzaAcknowledgedListeners.add(listener);
1611    }
1612
1613    /**
1614     * Remove the given Stanza acknowledged listener.
1615     *
1616     * @param listener the listener.
1617     * @return true if the listener was removed.
1618     */
1619    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1620        return stanzaAcknowledgedListeners.remove(listener);
1621    }
1622
1623    /**
1624     * Remove all stanza acknowledged listeners.
1625     */
1626    public void removeAllStanzaAcknowledgedListeners() {
1627        stanzaAcknowledgedListeners.clear();
1628    }
1629
1630    /**
1631     * Add a new Stanza ID acknowledged listener for the given ID.
1632     * <p>
1633     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1634     * automatically be removed after the listener was run.
1635     * </p>
1636     * 
1637     * @param id the stanza ID.
1638     * @param listener the listener to invoke.
1639     * @return the previous listener for this stanza ID or null.
1640     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1641     */
1642    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1643        // Prevent users from adding callbacks that will never get removed
1644        if (!smWasEnabledAtLeastOnce) {
1645            throw new StreamManagementException.StreamManagementNotEnabledException();
1646        }
1647        // Remove the listener after max. 12 hours
1648        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60);
1649        schedule(new Runnable() {
1650            @Override
1651            public void run() {
1652                stanzaIdAcknowledgedListeners.remove(id);
1653            }
1654        }, removeAfterSeconds, TimeUnit.SECONDS);
1655        return stanzaIdAcknowledgedListeners.put(id, listener);
1656    }
1657
1658    /**
1659     * Remove the Stanza ID acknowledged listener for the given ID.
1660     * 
1661     * @param id the stanza ID.
1662     * @return true if the listener was found and removed, false otherwise.
1663     */
1664    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1665        return stanzaIdAcknowledgedListeners.remove(id);
1666    }
1667
1668    /**
1669     * Removes all Stanza ID acknowledged listeners.
1670     */
1671    public void removeAllStanzaIdAcknowledgedListeners() {
1672        stanzaIdAcknowledgedListeners.clear();
1673    }
1674
1675    /**
1676     * Returns true if Stream Management is supported by the server.
1677     *
1678     * @return true if Stream Management is supported by the server.
1679     */
1680    public boolean isSmAvailable() {
1681        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1682    }
1683
1684    /**
1685     * Returns true if Stream Management was successfully negotiated with the server.
1686     *
1687     * @return true if Stream Management was negotiated.
1688     */
1689    public boolean isSmEnabled() {
1690        return smEnabledSyncPoint.wasSuccessful();
1691    }
1692
1693    /**
1694     * Returns true if the stream was successfully resumed with help of Stream Management.
1695     * 
1696     * @return true if the stream was resumed.
1697     */
1698    public boolean streamWasResumed() {
1699        return smResumedSyncPoint.wasSuccessful();
1700    }
1701
1702    /**
1703     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1704     * 
1705     * @return true if disconnected but resumption possible.
1706     */
1707    public boolean isDisconnectedButSmResumptionPossible() {
1708        return disconnectedButResumeable && isSmResumptionPossible();
1709    }
1710
1711    /**
1712     * Returns true if the stream is resumable.
1713     *
1714     * @return true if the stream is resumable.
1715     */
1716    public boolean isSmResumptionPossible() {
1717        // There is no resumable stream available
1718        if (smSessionId == null)
1719            return false;
1720
1721        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1722        // Seems like we are already reconnected, report true
1723        if (shutdownTimestamp == null) {
1724            return true;
1725        }
1726
1727        // See if resumption time is over
1728        long current = System.currentTimeMillis();
1729        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1730        if (current > shutdownTimestamp + maxResumptionMillies) {
1731            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1732            // resumption is possible
1733            return false;
1734        } else {
1735            return true;
1736        }
1737    }
1738
1739    /**
1740     * Drop the stream management state. Sets {@link #smSessionId} and
1741     * {@link #unacknowledgedStanzas} to <code>null</code>.
1742     */
1743    private void dropSmState() {
1744        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1745        // respective. No need to reset them here.
1746        smSessionId = null;
1747        unacknowledgedStanzas = null;
1748    }
1749
1750    /**
1751     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1752     * <p>
1753     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1754     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1755     * without checking for overflows before.
1756     * </p>
1757     *
1758     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1759     */
1760    public int getMaxSmResumptionTime() {
1761        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1762        int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE;
1763        return Math.min(clientResumptionTime, serverResumptionTime);
1764    }
1765
1766    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1767        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1768        final List<Stanza> ackedStanzas = new ArrayList<Stanza>(
1769                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1770                                        : Integer.MAX_VALUE);
1771        for (long i = 0; i < ackedStanzasCount; i++) {
1772            Stanza ackedStanza = unacknowledgedStanzas.poll();
1773            // If the server ack'ed a stanza, then it must be in the
1774            // unacknowledged stanza queue. There can be no exception.
1775            if (ackedStanza == null) {
1776                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1777                                ackedStanzasCount, ackedStanzas);
1778            }
1779            ackedStanzas.add(ackedStanza);
1780        }
1781
1782        boolean atLeastOneStanzaAcknowledgedListener = false;
1783        if (!stanzaAcknowledgedListeners.isEmpty()) {
1784            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1785            atLeastOneStanzaAcknowledgedListener = true;
1786        }
1787        else {
1788            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1789            for (Stanza ackedStanza : ackedStanzas) {
1790                String id = ackedStanza.getStanzaId();
1791                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1792                    atLeastOneStanzaAcknowledgedListener = true;
1793                    break;
1794                }
1795            }
1796        }
1797
1798        // Only spawn a new thread if there is a chance that some listener is invoked
1799        if (atLeastOneStanzaAcknowledgedListener) {
1800            asyncGo(new Runnable() {
1801                @Override
1802                public void run() {
1803                    for (Stanza ackedStanza : ackedStanzas) {
1804                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1805                            try {
1806                                listener.processPacket(ackedStanza);
1807                            }
1808                            catch (NotConnectedException e) {
1809                                LOGGER.log(Level.FINER, "Received not connected exception", e);
1810                            }
1811                        }
1812                        String id = ackedStanza.getStanzaId();
1813                        if (StringUtils.isNullOrEmpty(id)) {
1814                            continue;
1815                        }
1816                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1817                        if (listener != null) {
1818                            try {
1819                                listener.processPacket(ackedStanza);
1820                            }
1821                            catch (NotConnectedException e) {
1822                                LOGGER.log(Level.FINER, "Received not connected exception", e);
1823                            }
1824                        }
1825                    }
1826                }
1827            });
1828        }
1829
1830        serverHandledStanzasCount = handledCount;
1831    }
1832
1833    /**
1834     * Set the default bundle and defer callback used for new connections.
1835     *
1836     * @param defaultBundleAndDeferCallback
1837     * @see BundleAndDeferCallback
1838     * @since 4.1
1839     */
1840    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1841        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1842    }
1843
1844    /**
1845     * Set the bundle and defer callback used for this connection.
1846     * <p>
1847     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1848     * no longer get deferred.
1849     * </p>
1850     *
1851     * @param bundleAndDeferCallback the callback or <code>null</code>.
1852     * @see BundleAndDeferCallback
1853     * @since 4.1
1854     */
1855    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1856        this.bundleAndDeferCallback = bundleAndDeferCallback;
1857    }
1858
1859}