001/**
002 *
003 * Copyright 2003-2007 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smack.tcp;
018
019import java.io.BufferedReader;
020import java.io.ByteArrayInputStream;
021import java.io.FileInputStream;
022import java.io.IOException;
023import java.io.InputStream;
024import java.io.InputStreamReader;
025import java.io.OutputStream;
026import java.io.OutputStreamWriter;
027import java.io.Writer;
028import java.lang.reflect.Constructor;
029import java.net.InetAddress;
030import java.net.InetSocketAddress;
031import java.net.Socket;
032import java.security.KeyManagementException;
033import java.security.KeyStore;
034import java.security.KeyStoreException;
035import java.security.NoSuchAlgorithmException;
036import java.security.NoSuchProviderException;
037import java.security.Provider;
038import java.security.SecureRandom;
039import java.security.Security;
040import java.security.UnrecoverableKeyException;
041import java.security.cert.CertificateException;
042import java.util.ArrayList;
043import java.util.Collection;
044import java.util.Iterator;
045import java.util.LinkedHashSet;
046import java.util.LinkedList;
047import java.util.List;
048import java.util.Map;
049import java.util.Set;
050import java.util.concurrent.ArrayBlockingQueue;
051import java.util.concurrent.BlockingQueue;
052import java.util.concurrent.ConcurrentHashMap;
053import java.util.concurrent.ConcurrentLinkedQueue;
054import java.util.concurrent.TimeUnit;
055import java.util.concurrent.atomic.AtomicBoolean;
056import java.util.logging.Level;
057import java.util.logging.Logger;
058
059import javax.net.SocketFactory;
060import javax.net.ssl.HostnameVerifier;
061import javax.net.ssl.KeyManager;
062import javax.net.ssl.KeyManagerFactory;
063import javax.net.ssl.SSLContext;
064import javax.net.ssl.SSLSession;
065import javax.net.ssl.SSLSocket;
066import javax.net.ssl.TrustManager;
067import javax.net.ssl.X509TrustManager;
068import javax.security.auth.callback.Callback;
069import javax.security.auth.callback.CallbackHandler;
070import javax.security.auth.callback.PasswordCallback;
071
072import org.jivesoftware.smack.AbstractConnectionListener;
073import org.jivesoftware.smack.AbstractXMPPConnection;
074import org.jivesoftware.smack.ConnectionConfiguration;
075import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode;
076import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode;
077import org.jivesoftware.smack.SmackConfiguration;
078import org.jivesoftware.smack.SmackException;
079import org.jivesoftware.smack.SmackException.AlreadyConnectedException;
080import org.jivesoftware.smack.SmackException.AlreadyLoggedInException;
081import org.jivesoftware.smack.SmackException.ConnectionException;
082import org.jivesoftware.smack.SmackException.NoResponseException;
083import org.jivesoftware.smack.SmackException.NotConnectedException;
084import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException;
085import org.jivesoftware.smack.StanzaListener;
086import org.jivesoftware.smack.SynchronizationPoint;
087import org.jivesoftware.smack.XMPPConnection;
088import org.jivesoftware.smack.XMPPException;
089import org.jivesoftware.smack.XMPPException.FailedNonzaException;
090import org.jivesoftware.smack.XMPPException.StreamErrorException;
091import org.jivesoftware.smack.compress.packet.Compress;
092import org.jivesoftware.smack.compress.packet.Compressed;
093import org.jivesoftware.smack.compression.XMPPInputOutputStream;
094import org.jivesoftware.smack.filter.StanzaFilter;
095import org.jivesoftware.smack.packet.Element;
096import org.jivesoftware.smack.packet.IQ;
097import org.jivesoftware.smack.packet.Message;
098import org.jivesoftware.smack.packet.Nonza;
099import org.jivesoftware.smack.packet.Presence;
100import org.jivesoftware.smack.packet.Stanza;
101import org.jivesoftware.smack.packet.StartTls;
102import org.jivesoftware.smack.packet.StreamError;
103import org.jivesoftware.smack.packet.StreamOpen;
104import org.jivesoftware.smack.proxy.ProxyInfo;
105import org.jivesoftware.smack.sasl.packet.SaslStreamElements;
106import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge;
107import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure;
108import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success;
109import org.jivesoftware.smack.sm.SMUtils;
110import org.jivesoftware.smack.sm.StreamManagementException;
111import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException;
112import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError;
113import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException;
114import org.jivesoftware.smack.sm.packet.StreamManagement;
115import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer;
116import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest;
117import org.jivesoftware.smack.sm.packet.StreamManagement.Enable;
118import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled;
119import org.jivesoftware.smack.sm.packet.StreamManagement.Failed;
120import org.jivesoftware.smack.sm.packet.StreamManagement.Resume;
121import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed;
122import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature;
123import org.jivesoftware.smack.sm.predicates.Predicate;
124import org.jivesoftware.smack.sm.provider.ParseStreamManagement;
125import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
126import org.jivesoftware.smack.util.Async;
127import org.jivesoftware.smack.util.DNSUtil;
128import org.jivesoftware.smack.util.PacketParserUtils;
129import org.jivesoftware.smack.util.StringUtils;
130import org.jivesoftware.smack.util.TLSUtils;
131import org.jivesoftware.smack.util.XmlStringBuilder;
132import org.jivesoftware.smack.util.dns.HostAddress;
133import org.jivesoftware.smack.util.dns.SmackDaneProvider;
134import org.jivesoftware.smack.util.dns.SmackDaneVerifier;
135
136import org.jxmpp.jid.impl.JidCreate;
137import org.jxmpp.jid.parts.Resourcepart;
138import org.jxmpp.stringprep.XmppStringprepException;
139import org.jxmpp.util.XmppStringUtils;
140import org.xmlpull.v1.XmlPullParser;
141import org.xmlpull.v1.XmlPullParserException;
142
143/**
144 * Creates a socket connection to an XMPP server. This is the default connection
145 * to an XMPP server and is specified in the XMPP Core (RFC 6120).
146 * 
147 * @see XMPPConnection
148 * @author Matt Tucker
149 */
150public class XMPPTCPConnection extends AbstractXMPPConnection {
151
152    private static final int QUEUE_SIZE = 500;
153    private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName());
154
155    /**
156     * The socket which is used for this connection.
157     */
158    private Socket socket;
159
160    /**
161     * 
162     */
163    private boolean disconnectedButResumeable = false;
164
165    private SSLSocket secureSocket;
166
167    /**
168     * Protected access level because of unit test purposes
169     */
170    protected PacketWriter packetWriter;
171
172    /**
173     * Protected access level because of unit test purposes
174     */
175    protected PacketReader packetReader;
176
177    private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<>(
178                    this, "initial open stream element send to server");
179
180    /**
181     * 
182     */
183    private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>(
184                    this, "stream compression feature");
185
186    /**
187     * 
188     */
189    private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>(
190                    this, "stream compression");
191
192    /**
193     * A synchronization point which is successful if this connection has received the closing
194     * stream element from the remote end-point, i.e. the server.
195     */
196    private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>(
197                    this, "stream closing element received");
198
199    /**
200     * The default bundle and defer callback, used for new connections.
201     * @see bundleAndDeferCallback
202     */
203    private static BundleAndDeferCallback defaultBundleAndDeferCallback;
204
205    /**
206     * The used bundle and defer callback.
207     * <p>
208     * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid
209     * having a 'volatile' read within the writer threads loop.
210     * </p>
211     */
212    private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback;
213
214    private static boolean useSmDefault = true;
215
216    private static boolean useSmResumptionDefault = true;
217
218    /**
219     * The stream ID of the stream that is currently resumable, ie. the stream we hold the state
220     * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and
221     * {@link #unacknowledgedStanzas}.
222     */
223    private String smSessionId;
224
225    private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>(
226                    this, "stream resumed element");
227
228    private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>(
229                    this, "stream enabled element");
230
231    /**
232     * The client's preferred maximum resumption time in seconds.
233     */
234    private int smClientMaxResumptionTime = -1;
235
236    /**
237     * The server's preferred maximum resumption time in seconds.
238     */
239    private int smServerMaxResumptimTime = -1;
240
241    /**
242     * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server.
243     */
244    private boolean useSm = useSmDefault;
245    private boolean useSmResumption = useSmResumptionDefault;
246
247    /**
248     * The counter that the server sends the client about it's current height. For example, if the server sends
249     * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue).
250     */
251    private long serverHandledStanzasCount = 0;
252
253    /**
254     * The counter for stanzas handled ("received") by the client.
255     * <p>
256     * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are
257     * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since
258     * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and
259     * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic.
260     * </p>
261     */
262    private long clientHandledStanzasCount = 0;
263
264    private BlockingQueue<Stanza> unacknowledgedStanzas;
265
266    /**
267     * Set to true if Stream Management was at least once enabled for this connection.
268     */
269    private boolean smWasEnabledAtLeastOnce = false;
270
271    /**
272     * This listeners are invoked for every stanza that got acknowledged.
273     * <p>
274     * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove
275     * themselves after they have been invoked.
276     * </p>
277     */
278    private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>();
279
280    /**
281     * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will
282     * only be invoked once and automatically removed after that.
283     */
284    private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>();
285
286    /**
287     * Predicates that determine if an stream management ack should be requested from the server.
288     * <p>
289     * We use a linked hash set here, so that the order how the predicates are added matches the
290     * order in which they are invoked in order to determine if an ack request should be send or not.
291     * </p>
292     */
293    private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>();
294
295    private final XMPPTCPConnectionConfiguration config;
296
297    /**
298     * Creates a new XMPP connection over TCP (optionally using proxies).
299     * <p>
300     * Note that XMPPTCPConnection constructors do not establish a connection to the server
301     * and you must call {@link #connect()}.
302     * </p>
303     *
304     * @param config the connection configuration.
305     */
306    public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) {
307        super(config);
308        this.config = config;
309        addConnectionListener(new AbstractConnectionListener() {
310            @Override
311            public void connectionClosedOnError(Exception e) {
312                if (e instanceof XMPPException.StreamErrorException) {
313                    dropSmState();
314                }
315            }
316        });
317    }
318
319    /**
320     * Creates a new XMPP connection over TCP.
321     * <p>
322     * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the
323     * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)}
324     * constructor.
325     * </p>
326     * 
327     * @param jid the bare JID used by the client.
328     * @param password the password or authentication token.
329     * @throws XmppStringprepException 
330     */
331    public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException {
332        this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString()));
333    }
334
335    /**
336     * Creates a new XMPP connection over TCP.
337     * <p>
338     * This is the simplest constructor for connecting to an XMPP server. Alternatively,
339     * you can get fine-grained control over connection settings using the
340     * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor.
341     * </p>
342     * @param username
343     * @param password
344     * @param serviceName
345     * @throws XmppStringprepException 
346     */
347    public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException {
348        this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain(
349                                        JidCreate.domainBareFrom(serviceName)).build());
350    }
351
352    @Override
353    protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException {
354        if (packetWriter == null) {
355            throw new NotConnectedException();
356        }
357        packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
358    }
359
360    @Override
361    protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException {
362        if (isConnected() && !disconnectedButResumeable) {
363            throw new AlreadyConnectedException();
364        }
365    }
366
367    @Override
368    protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException {
369        if (isAuthenticated() && !disconnectedButResumeable) {
370            throw new AlreadyLoggedInException();
371        }
372    }
373
374    @Override
375    protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException {
376        // Reset the flag in case it was set
377        disconnectedButResumeable = false;
378        super.afterSuccessfulLogin(resumed);
379    }
380
381    @Override
382    protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
383                    SmackException, IOException, InterruptedException {
384        // Authenticate using SASL
385        SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null;
386        saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession);
387
388        // If compression is enabled then request the server to use stream compression. XEP-170
389        // recommends to perform stream compression before resource binding.
390        maybeEnableCompression();
391
392        if (isSmResumptionPossible()) {
393            smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId));
394            if (smResumedSyncPoint.wasSuccessful()) {
395                // We successfully resumed the stream, be done here
396                afterSuccessfulLogin(true);
397                return;
398            }
399            // SM resumption failed, what Smack does here is to report success of
400            // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that
401            // normal resource binding can be tried.
402            LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process");
403        }
404
405        List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>();
406        if (unacknowledgedStanzas != null) {
407            // There was a previous connection with SM enabled but that was either not resumable or
408            // failed to resume. Make sure that we (re-)send the unacknowledged stanzas.
409            unacknowledgedStanzas.drainTo(previouslyUnackedStanzas);
410            // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this
411            // XMPP session (There maybe was an enabled in a previous XMPP session of this
412            // connection instance though). This is used in writePackets to decide if stanzas should
413            // be added to the unacknowledged stanzas queue, because they have to be added right
414            // after the 'enable' stream element has been sent.
415            dropSmState();
416        }
417
418        // Now bind the resource. It is important to do this *after* we dropped an eventually
419        // existing Stream Management state. As otherwise <bind/> and <session/> may end up in
420        // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706.
421        bindResourceAndEstablishSession(resource);
422
423        if (isSmAvailable() && useSm) {
424            // Remove what is maybe left from previously stream managed sessions
425            serverHandledStanzasCount = 0;
426            // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed'
427            // then this is a non recoverable error and we therefore throw an exception.
428            smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime));
429            synchronized (requestAckPredicates) {
430                if (requestAckPredicates.isEmpty()) {
431                    // Assure that we have at lest one predicate set up that so that we request acks
432                    // for the server and eventually flush some stanzas from the unacknowledged
433                    // stanza queue
434                    requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas());
435                }
436            }
437        }
438        // (Re-)send the stanzas *after* we tried to enable SM
439        for (Stanza stanza : previouslyUnackedStanzas) {
440            sendStanzaInternal(stanza);
441        }
442
443        afterSuccessfulLogin(false);
444    }
445
446    @Override
447    public boolean isSecureConnection() {
448        return secureSocket != null;
449    }
450
451    /**
452     * Shuts the current connection down. After this method returns, the connection must be ready
453     * for re-use by connect.
454     */
455    @Override
456    protected void shutdown() {
457        if (isSmEnabled()) {
458            try {
459                // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM
460                // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either.
461                sendSmAcknowledgementInternal();
462            } catch (InterruptedException | NotConnectedException e) {
463                LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e);
464            }
465        }
466        shutdown(false);
467    }
468
469    /**
470     * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza.
471     */
472    public synchronized void instantShutdown() {
473        shutdown(true);
474    }
475
476    private void shutdown(boolean instant) {
477        if (disconnectedButResumeable) {
478            return;
479        }
480
481        // First shutdown the writer, this will result in a closing stream element getting send to
482        // the server
483        if (packetWriter != null) {
484            LOGGER.finer("PacketWriter shutdown()");
485            packetWriter.shutdown(instant);
486        }
487        LOGGER.finer("PacketWriter has been shut down");
488
489        if (!instant) {
490            try {
491                // After we send the closing stream element, check if there was already a
492                // closing stream element sent by the server or wait with a timeout for a
493                // closing stream element to be received from the server.
494                @SuppressWarnings("unused")
495                Exception res = closingStreamReceived.checkIfSuccessOrWait();
496            } catch (InterruptedException | NoResponseException e) {
497                LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e);
498            }
499        }
500
501        if (packetReader != null) {
502            LOGGER.finer("PacketReader shutdown()");
503                packetReader.shutdown();
504        }
505        LOGGER.finer("PacketReader has been shut down");
506
507        try {
508                socket.close();
509        } catch (Exception e) {
510                LOGGER.log(Level.WARNING, "shutdown", e);
511        }
512
513        setWasAuthenticated();
514        // If we are able to resume the stream, then don't set
515        // connected/authenticated/usingTLS to false since we like behave like we are still
516        // connected (e.g. sendStanza should not throw a NotConnectedException).
517        if (isSmResumptionPossible() && instant) {
518            disconnectedButResumeable = true;
519        } else {
520            disconnectedButResumeable = false;
521            // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing
522            // stream tag, there is no longer a stream to resume.
523            smSessionId = null;
524        }
525        authenticated = false;
526        connected = false;
527        secureSocket = null;
528        reader = null;
529        writer = null;
530
531        maybeCompressFeaturesReceived.init();
532        compressSyncPoint.init();
533        smResumedSyncPoint.init();
534        smEnabledSyncPoint.init();
535        initalOpenStreamSend.init();
536    }
537
538    @Override
539    public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException {
540        packetWriter.sendStreamElement(element);
541    }
542
543    @Override
544    protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException {
545        packetWriter.sendStreamElement(packet);
546        if (isSmEnabled()) {
547            for (StanzaFilter requestAckPredicate : requestAckPredicates) {
548                if (requestAckPredicate.accept(packet)) {
549                    requestSmAcknowledgementInternal();
550                    break;
551                }
552            }
553        }
554    }
555
556    private void connectUsingConfiguration() throws ConnectionException, IOException {
557        List<HostAddress> failedAddresses = populateHostAddresses();
558        SocketFactory socketFactory = config.getSocketFactory();
559        ProxyInfo proxyInfo = config.getProxyInfo();
560        int timeout = config.getConnectTimeout();
561        if (socketFactory == null) {
562            socketFactory = SocketFactory.getDefault();
563        }
564        for (HostAddress hostAddress : hostAddresses) {
565            Iterator<InetAddress> inetAddresses = null;
566            String host = hostAddress.getHost();
567            int port = hostAddress.getPort();
568            if (proxyInfo == null) {
569                inetAddresses = hostAddress.getInetAddresses().iterator();
570                assert (inetAddresses.hasNext());
571
572                innerloop: while (inetAddresses.hasNext()) {
573                    // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not
574                    // re-usable after a failed connection attempt. See also SMACK-724.
575                    socket = socketFactory.createSocket();
576
577                    final InetAddress inetAddress = inetAddresses.next();
578                    final String inetAddressAndPort = inetAddress + " at port " + port;
579                    LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort);
580                    try {
581                        socket.connect(new InetSocketAddress(inetAddress, port), timeout);
582                    } catch (Exception e) {
583                        hostAddress.setException(inetAddress, e);
584                        if (inetAddresses.hasNext()) {
585                            continue innerloop;
586                        } else {
587                            break innerloop;
588                        }
589                    }
590                    LOGGER.finer("Established TCP connection to " + inetAddressAndPort);
591                    // We found a host to connect to, return here
592                    this.host = host;
593                    this.port = port;
594                    return;
595                }
596                failedAddresses.add(hostAddress);
597            } else {
598                socket = socketFactory.createSocket();
599                StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy");
600                final String hostAndPort = host + " at port " + port;
601                LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort);
602                try {
603                    proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout);
604                } catch (IOException e) {
605                    hostAddress.setException(e);
606                    continue;
607                }
608                LOGGER.finer("Established TCP connection to " + hostAndPort);
609                // We found a host to connect to, return here
610                this.host = host;
611                this.port = port;
612                return;
613            }
614        }
615        // There are no more host addresses to try
616        // throw an exception and report all tried
617        // HostAddresses in the exception
618        throw ConnectionException.from(failedAddresses);
619    }
620
621    /**
622     * Initializes the connection by creating a stanza(/packet) reader and writer and opening a
623     * XMPP stream to the server.
624     *
625     * @throws XMPPException if establishing a connection to the server fails.
626     * @throws SmackException if the server failes to respond back or if there is anther error.
627     * @throws IOException 
628     */
629    private void initConnection() throws IOException {
630        boolean isFirstInitialization = packetReader == null || packetWriter == null;
631        compressionHandler = null;
632
633        // Set the reader and writer instance variables
634        initReaderAndWriter();
635
636        if (isFirstInitialization) {
637            packetWriter = new PacketWriter();
638            packetReader = new PacketReader();
639
640            // If debugging is enabled, we should start the thread that will listen for
641            // all packets and then log them.
642            if (config.isDebuggerEnabled()) {
643                addAsyncStanzaListener(debugger.getReaderListener(), null);
644                if (debugger.getWriterListener() != null) {
645                    addPacketSendingListener(debugger.getWriterListener(), null);
646                }
647            }
648        }
649        // Start the packet writer. This will open an XMPP stream to the server
650        packetWriter.init();
651        // Start the packet reader. The startup() method will block until we
652        // get an opening stream packet back from server
653        packetReader.init();
654    }
655
656    private void initReaderAndWriter() throws IOException {
657        InputStream is = socket.getInputStream();
658        OutputStream os = socket.getOutputStream();
659        if (compressionHandler != null) {
660            is = compressionHandler.getInputStream(is);
661            os = compressionHandler.getOutputStream(os);
662        }
663        // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter
664        writer = new OutputStreamWriter(os, "UTF-8");
665        reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
666
667        // If debugging is enabled, we open a window and write out all network traffic.
668        initDebugger();
669    }
670
671    /**
672     * The server has indicated that TLS negotiation can start. We now need to secure the
673     * existing plain connection and perform a handshake. This method won't return until the
674     * connection has finished the handshake or an error occurred while securing the connection.
675     * @throws IOException 
676     * @throws CertificateException 
677     * @throws NoSuchAlgorithmException 
678     * @throws NoSuchProviderException 
679     * @throws KeyStoreException 
680     * @throws UnrecoverableKeyException 
681     * @throws KeyManagementException 
682     * @throws SmackException 
683     * @throws Exception if an exception occurs.
684     */
685    private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException {
686        SSLContext context = this.config.getCustomSSLContext();
687        KeyStore ks = null;
688        KeyManager[] kms = null;
689        PasswordCallback pcb = null;
690        SmackDaneVerifier daneVerifier = null;
691
692        if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) {
693            SmackDaneProvider daneProvider = DNSUtil.getDaneProvider();
694            if (daneProvider == null) {
695                throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured");
696            }
697            daneVerifier = daneProvider.newInstance();
698            if (daneVerifier == null) {
699                throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier");
700            }
701        }
702
703        if (context == null) {
704            final String keyStoreType = config.getKeystoreType();
705            final CallbackHandler callbackHandler = config.getCallbackHandler();
706            final String keystorePath = config.getKeystorePath();
707            if ("PKCS11".equals(keyStoreType)) {
708                try {
709                    Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class);
710                    String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library();
711                    ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8));
712                    Provider p = (Provider) c.newInstance(config);
713                    Security.addProvider(p);
714                    ks = KeyStore.getInstance("PKCS11",p);
715                    pcb = new PasswordCallback("PKCS11 Password: ",false);
716                    callbackHandler.handle(new Callback[] {pcb});
717                    ks.load(null,pcb.getPassword());
718                }
719                catch (Exception e) {
720                    LOGGER.log(Level.WARNING, "Exception", e);
721                    ks = null;
722                }
723            }
724            else if ("Apple".equals(keyStoreType)) {
725                ks = KeyStore.getInstance("KeychainStore","Apple");
726                ks.load(null,null);
727                //pcb = new PasswordCallback("Apple Keychain",false);
728                //pcb.setPassword(null);
729            }
730            else if (keyStoreType != null) {
731                ks = KeyStore.getInstance(keyStoreType);
732                if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) {
733                    try {
734                        pcb = new PasswordCallback("Keystore Password: ", false);
735                        callbackHandler.handle(new Callback[] { pcb });
736                        ks.load(new FileInputStream(keystorePath), pcb.getPassword());
737                    }
738                    catch (Exception e) {
739                        LOGGER.log(Level.WARNING, "Exception", e);
740                        ks = null;
741                    }
742                } else {
743                    ks.load(null, null);
744                }
745            }
746
747            if (ks != null) {
748                String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm();
749                KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm);
750                try {
751                    if (pcb == null) {
752                        kmf.init(ks, null);
753                    }
754                    else {
755                        kmf.init(ks, pcb.getPassword());
756                        pcb.clearPassword();
757                    }
758                    kms = kmf.getKeyManagers();
759                }
760                catch (NullPointerException npe) {
761                    LOGGER.log(Level.WARNING, "NullPointerException", npe);
762                }
763            }
764
765            // If the user didn't specify a SSLContext, use the default one
766            context = SSLContext.getInstance("TLS");
767
768            final SecureRandom secureRandom = new java.security.SecureRandom();
769            X509TrustManager customTrustManager = config.getCustomX509TrustManager();
770
771            if (daneVerifier != null) {
772                // User requested DANE verification.
773                daneVerifier.init(context, kms, customTrustManager, secureRandom);
774            } else {
775                TrustManager[] customTrustManagers = null;
776                if (customTrustManager != null) {
777                    customTrustManagers = new TrustManager[] { customTrustManager };
778                }
779                context.init(kms, customTrustManagers, secureRandom);
780            }
781        }
782
783        Socket plain = socket;
784        // Secure the plain connection
785        socket = context.getSocketFactory().createSocket(plain,
786                host, plain.getPort(), true);
787
788        final SSLSocket sslSocket = (SSLSocket) socket;
789        // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is
790        // important (at least on certain platforms) and it seems to be a good idea anyways to
791        // prevent an accidental implicit handshake.
792        TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers());
793
794        // Initialize the reader and writer with the new secured version
795        initReaderAndWriter();
796
797        // Proceed to do the handshake
798        sslSocket.startHandshake();
799
800        if (daneVerifier != null) {
801            daneVerifier.finish(sslSocket);
802        }
803
804        final HostnameVerifier verifier = getConfiguration().getHostnameVerifier();
805        if (verifier == null) {
806                throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure.");
807        } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) {
808            throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain());
809        }
810
811        // Set that TLS was successful
812        secureSocket = sslSocket;
813    }
814
815    /**
816     * Returns the compression handler that can be used for one compression methods offered by the server.
817     * 
818     * @return a instance of XMPPInputOutputStream or null if no suitable instance was found
819     * 
820     */
821    private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) {
822        for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) {
823                String method = handler.getCompressionMethod();
824                if (compression.getMethods().contains(method))
825                    return handler;
826        }
827        return null;
828    }
829
830    @Override
831    public boolean isUsingCompression() {
832        return compressionHandler != null && compressSyncPoint.wasSuccessful();
833    }
834
835    /**
836     * <p>
837     * Starts using stream compression that will compress network traffic. Traffic can be
838     * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network
839     * connection. However, the server and the client will need to use more CPU time in order to
840     * un/compress network data so under high load the server performance might be affected.
841     * </p>
842     * <p>
843     * Stream compression has to have been previously offered by the server. Currently only the
844     * zlib method is supported by the client. Stream compression negotiation has to be done
845     * before authentication took place.
846     * </p>
847     *
848     * @throws NotConnectedException 
849     * @throws SmackException
850     * @throws NoResponseException 
851     * @throws InterruptedException 
852     */
853    private void maybeEnableCompression() throws NotConnectedException, NoResponseException, SmackException, InterruptedException {
854        if (!config.isCompressionEnabled()) {
855            return;
856        }
857        maybeCompressFeaturesReceived.checkIfSuccessOrWait();
858        Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE);
859        if (compression == null) {
860            // Server does not support compression
861            return;
862        }
863        // If stream compression was offered by the server and we want to use
864        // compression then send compression request to the server
865        if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) {
866            compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod()));
867        } else {
868            LOGGER.warning("Could not enable compression because no matching handler/method pair was found");
869        }
870    }
871
872    /**
873     * Establishes a connection to the XMPP server. It basically
874     * creates and maintains a socket connection to the server.
875     * <p>
876     * Listeners will be preserved from a previous connection if the reconnection
877     * occurs after an abrupt termination.
878     * </p>
879     *
880     * @throws XMPPException if an error occurs while trying to establish the connection.
881     * @throws SmackException 
882     * @throws IOException 
883     * @throws InterruptedException 
884     */
885    @Override
886    protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
887        closingStreamReceived.init();
888        // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if
889        // there is an error establishing the connection
890        connectUsingConfiguration();
891
892        // We connected successfully to the servers TCP port
893        initConnection();
894    }
895
896    /**
897     * Sends out a notification that there was an error with the connection
898     * and closes the connection. Also prints the stack trace of the given exception
899     *
900     * @param e the exception that causes the connection close event.
901     */
902    private synchronized void notifyConnectionError(Exception e) {
903        // Listeners were already notified of the exception, return right here.
904        if ((packetReader == null || packetReader.done) &&
905                (packetWriter == null || packetWriter.done())) return;
906
907        // Closes the connection temporary. A reconnection is possible
908        // Note that a connection listener of XMPPTCPConnection will drop the SM state in
909        // case the Exception is a StreamErrorException.
910        instantShutdown();
911
912        // Notify connection listeners of the error.
913        callConnectionClosedOnErrorListener(e);
914    }
915
916    /**
917     * For unit testing purposes
918     *
919     * @param writer
920     */
921    protected void setWriter(Writer writer) {
922        this.writer = writer;
923    }
924
925    @Override
926    protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException {
927        StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE);
928        if (startTlsFeature != null) {
929            if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) {
930                SmackException smackException = new SecurityRequiredByServerException();
931                tlsHandled.reportFailure(smackException);
932                notifyConnectionError(smackException);
933                return;
934            }
935
936            if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) {
937                sendNonza(new StartTls());
938            } else {
939                tlsHandled.reportSuccess();
940            }
941        } else {
942            tlsHandled.reportSuccess();
943        }
944
945        if (getSASLAuthentication().authenticationSuccessful()) {
946            // If we have received features after the SASL has been successfully completed, then we
947            // have also *maybe* received, as it is an optional feature, the compression feature
948            // from the server.
949            maybeCompressFeaturesReceived.reportSuccess();
950        }
951    }
952
953    /**
954     * Resets the parser using the latest connection's reader. Reseting the parser is necessary
955     * when the plain connection has been secured or when a new opening stream element is going
956     * to be sent by the server.
957     *
958     * @throws SmackException if the parser could not be reset.
959     * @throws InterruptedException 
960     */
961    void openStream() throws SmackException, InterruptedException {
962        // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as
963        // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external
964        // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first
965        // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.)
966        CharSequence to = getXMPPServiceDomain();
967        CharSequence from = null;
968        CharSequence localpart = config.getUsername();
969        if (localpart != null) {
970            from = XmppStringUtils.completeJidFrom(localpart, to);
971        }
972        String id = getStreamId();
973        sendNonza(new StreamOpen(to, from, id));
974        try {
975            packetReader.parser = PacketParserUtils.newXmppParser(reader);
976        }
977        catch (XmlPullParserException e) {
978            throw new SmackException(e);
979        }
980    }
981
982    protected class PacketReader {
983
984        XmlPullParser parser;
985
986        private volatile boolean done;
987
988        /**
989         * Initializes the reader in order to be used. The reader is initialized during the
990         * first connection and when reconnecting due to an abruptly disconnection.
991         */
992        void init() {
993            done = false;
994
995            Async.go(new Runnable() {
996                @Override
997                public void run() {
998                    parsePackets();
999                }
1000            }, "Smack Packet Reader (" + getConnectionCounter() + ")");
1001         }
1002
1003        /**
1004         * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true.
1005         */
1006        void shutdown() {
1007            done = true;
1008        }
1009
1010        /**
1011         * Parse top-level packets in order to process them further.
1012         *
1013         * @param thread the thread that is being used by the reader to parse incoming packets.
1014         */
1015        private void parsePackets() {
1016            try {
1017                initalOpenStreamSend.checkIfSuccessOrWait();
1018                int eventType = parser.getEventType();
1019                while (!done) {
1020                    switch (eventType) {
1021                    case XmlPullParser.START_TAG:
1022                        final String name = parser.getName();
1023                        switch (name) {
1024                        case Message.ELEMENT:
1025                        case IQ.IQ_ELEMENT:
1026                        case Presence.ELEMENT:
1027                            try {
1028                                parseAndProcessStanza(parser);
1029                            } finally {
1030                                clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount);
1031                            }
1032                            break;
1033                        case "stream":
1034                            // We found an opening stream.
1035                            if ("jabber:client".equals(parser.getNamespace(null))) {
1036                                streamId = parser.getAttributeValue("", "id");
1037                                String reportedServerDomain = parser.getAttributeValue("", "from");
1038                                assert (config.getXMPPServiceDomain().equals(reportedServerDomain));
1039                            }
1040                            break;
1041                        case "error":
1042                            StreamError streamError = PacketParserUtils.parseStreamError(parser);
1043                            saslFeatureReceived.reportFailure(new StreamErrorException(streamError));
1044                            // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync
1045                            // point to report the error, which is checked immediately after tlsHandled in
1046                            // connectInternal().
1047                            tlsHandled.reportSuccess();
1048                            throw new StreamErrorException(streamError);
1049                        case "features":
1050                            parseFeatures(parser);
1051                            break;
1052                        case "proceed":
1053                            try {
1054                                // Secure the connection by negotiating TLS
1055                                proceedTLSReceived();
1056                                // Send a new opening stream to the server
1057                                openStream();
1058                            }
1059                            catch (Exception e) {
1060                                SmackException smackException = new SmackException(e);
1061                                tlsHandled.reportFailure(smackException);
1062                                throw e;
1063                            }
1064                            break;
1065                        case "failure":
1066                            String namespace = parser.getNamespace(null);
1067                            switch (namespace) {
1068                            case "urn:ietf:params:xml:ns:xmpp-tls":
1069                                // TLS negotiation has failed. The server will close the connection
1070                                // TODO Parse failure stanza
1071                                throw new SmackException("TLS negotiation has failed");
1072                            case "http://jabber.org/protocol/compress":
1073                                // Stream compression has been denied. This is a recoverable
1074                                // situation. It is still possible to authenticate and
1075                                // use the connection but using an uncompressed connection
1076                                // TODO Parse failure stanza
1077                                compressSyncPoint.reportFailure(new SmackException(
1078                                                "Could not establish compression"));
1079                                break;
1080                            case SaslStreamElements.NAMESPACE:
1081                                // SASL authentication has failed. The server may close the connection
1082                                // depending on the number of retries
1083                                final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser);
1084                                getSASLAuthentication().authenticationFailed(failure);
1085                                break;
1086                            }
1087                            break;
1088                        case Challenge.ELEMENT:
1089                            // The server is challenging the SASL authentication made by the client
1090                            String challengeData = parser.nextText();
1091                            getSASLAuthentication().challengeReceived(challengeData);
1092                            break;
1093                        case Success.ELEMENT:
1094                            Success success = new Success(parser.nextText());
1095                            // We now need to bind a resource for the connection
1096                            // Open a new stream and wait for the response
1097                            openStream();
1098                            // The SASL authentication with the server was successful. The next step
1099                            // will be to bind the resource
1100                            getSASLAuthentication().authenticated(success);
1101                            break;
1102                        case Compressed.ELEMENT:
1103                            // Server confirmed that it's possible to use stream compression. Start
1104                            // stream compression
1105                            // Initialize the reader and writer with the new compressed version
1106                            initReaderAndWriter();
1107                            // Send a new opening stream to the server
1108                            openStream();
1109                            // Notify that compression is being used
1110                            compressSyncPoint.reportSuccess();
1111                            break;
1112                        case Enabled.ELEMENT:
1113                            Enabled enabled = ParseStreamManagement.enabled(parser);
1114                            if (enabled.isResumeSet()) {
1115                                smSessionId = enabled.getId();
1116                                if (StringUtils.isNullOrEmpty(smSessionId)) {
1117                                    SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received");
1118                                    smEnabledSyncPoint.reportFailure(xmppException);
1119                                    throw xmppException;
1120                                }
1121                                smServerMaxResumptimTime = enabled.getMaxResumptionTime();
1122                            } else {
1123                                // Mark this a non-resumable stream by setting smSessionId to null
1124                                smSessionId = null;
1125                            }
1126                            clientHandledStanzasCount = 0;
1127                            smWasEnabledAtLeastOnce = true;
1128                            smEnabledSyncPoint.reportSuccess();
1129                            LOGGER.fine("Stream Management (XEP-198): succesfully enabled");
1130                            break;
1131                        case Failed.ELEMENT:
1132                            Failed failed = ParseStreamManagement.failed(parser);
1133                            FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getXMPPErrorCondition());
1134                            // If only XEP-198 would specify different failure elements for the SM
1135                            // enable and SM resume failure case. But this is not the case, so we
1136                            // need to determine if this is a 'Failed' response for either 'Enable'
1137                            // or 'Resume'.
1138                            if (smResumedSyncPoint.requestSent()) {
1139                                smResumedSyncPoint.reportFailure(xmppException);
1140                            }
1141                            else {
1142                                if (!smEnabledSyncPoint.requestSent()) {
1143                                    throw new IllegalStateException("Failed element received but SM was not previously enabled");
1144                                }
1145                                smEnabledSyncPoint.reportFailure(new SmackException(xmppException));
1146                                // Report success for last lastFeaturesReceived so that in case a
1147                                // failed resumption, we can continue with normal resource binding.
1148                                // See text of XEP-198 5. below Example 11.
1149                                lastFeaturesReceived.reportSuccess();
1150                            }
1151                            break;
1152                        case Resumed.ELEMENT:
1153                            Resumed resumed = ParseStreamManagement.resumed(parser);
1154                            if (!smSessionId.equals(resumed.getPrevId())) {
1155                                throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId());
1156                            }
1157                            // Mark SM as enabled and resumption as successful.
1158                            smResumedSyncPoint.reportSuccess();
1159                            smEnabledSyncPoint.reportSuccess();
1160                            // First, drop the stanzas already handled by the server
1161                            processHandledCount(resumed.getHandledCount());
1162                            // Then re-send what is left in the unacknowledged queue
1163                            List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size());
1164                            unacknowledgedStanzas.drainTo(stanzasToResend);
1165                            for (Stanza stanza : stanzasToResend) {
1166                                sendStanzaInternal(stanza);
1167                            }
1168                            // If there where stanzas resent, then request a SM ack for them.
1169                            // Writer's sendStreamElement() won't do it automatically based on
1170                            // predicates.
1171                            if (!stanzasToResend.isEmpty()) {
1172                                requestSmAcknowledgementInternal();
1173                            }
1174                            LOGGER.fine("Stream Management (XEP-198): Stream resumed");
1175                            break;
1176                        case AckAnswer.ELEMENT:
1177                            AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser);
1178                            processHandledCount(ackAnswer.getHandledCount());
1179                            break;
1180                        case AckRequest.ELEMENT:
1181                            ParseStreamManagement.ackRequest(parser);
1182                            if (smEnabledSyncPoint.wasSuccessful()) {
1183                                sendSmAcknowledgementInternal();
1184                            } else {
1185                                LOGGER.warning("SM Ack Request received while SM is not enabled");
1186                            }
1187                            break;
1188                         default:
1189                             LOGGER.warning("Unknown top level stream element: " + name);
1190                             break;
1191                        }
1192                        break;
1193                    case XmlPullParser.END_TAG:
1194                        if (parser.getName().equals("stream")) {
1195                            if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) {
1196                                LOGGER.warning(XMPPTCPConnection.this +  " </stream> but different namespace " + parser.getNamespace());
1197                                break;
1198                            }
1199
1200                            // Check if the queue was already shut down before reporting success on closing stream tag
1201                            // received. This avoids a race if there is a disconnect(), followed by a connect(), which
1202                            // did re-start the queue again, causing this writer to assume that the queue is not
1203                            // shutdown, which results in a call to disconnect().
1204                            final boolean queueWasShutdown = packetWriter.queue.isShutdown();
1205                            closingStreamReceived.reportSuccess();
1206
1207                            if (queueWasShutdown) {
1208                                // We received a closing stream element *after* we initiated the
1209                                // termination of the session by sending a closing stream element to
1210                                // the server first
1211                                return;
1212                            } else {
1213                                // We received a closing stream element from the server without us
1214                                // sending a closing stream element first. This means that the
1215                                // server wants to terminate the session, therefore disconnect
1216                                // the connection
1217                                LOGGER.info(XMPPTCPConnection.this
1218                                                + " received closing </stream> element."
1219                                                + " Server wants to terminate the connection, calling disconnect()");
1220                                disconnect();
1221                            }
1222                        }
1223                        break;
1224                    case XmlPullParser.END_DOCUMENT:
1225                        // END_DOCUMENT only happens in an error case, as otherwise we would see a
1226                        // closing stream element before.
1227                        throw new SmackException(
1228                                        "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element");
1229                    }
1230                    eventType = parser.next();
1231                }
1232            }
1233            catch (Exception e) {
1234                closingStreamReceived.reportFailure(e);
1235                // The exception can be ignored if the the connection is 'done'
1236                // or if the it was caused because the socket got closed
1237                if (!(done || packetWriter.queue.isShutdown())) {
1238                    // Close the connection and notify connection listeners of the
1239                    // error.
1240                    notifyConnectionError(e);
1241                }
1242            }
1243        }
1244    }
1245
1246    protected class PacketWriter {
1247        public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE;
1248
1249        private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>(
1250                        QUEUE_SIZE, true);
1251
1252        /**
1253         * Needs to be protected for unit testing purposes.
1254         */
1255        protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>(
1256                        XMPPTCPConnection.this, "shutdown completed");
1257
1258        /**
1259         * If set, the stanza(/packet) writer is shut down
1260         */
1261        protected volatile Long shutdownTimestamp = null;
1262
1263        private volatile boolean instantShutdown;
1264
1265        /**
1266         * True if some preconditions are given to start the bundle and defer mechanism.
1267         * <p>
1268         * This will likely get set to true right after the start of the writer thread, because
1269         * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set
1270         * this field to true.
1271         * </p>
1272         */
1273        private boolean shouldBundleAndDefer;
1274
1275        /** 
1276        * Initializes the writer in order to be used. It is called at the first connection and also 
1277        * is invoked if the connection is disconnected by an error.
1278        */ 
1279        void init() {
1280            shutdownDone.init();
1281            shutdownTimestamp = null;
1282
1283            if (unacknowledgedStanzas != null) {
1284                // It's possible that there are new stanzas in the writer queue that
1285                // came in while we were disconnected but resumable, drain those into
1286                // the unacknowledged queue so that they get resent now
1287                drainWriterQueueToUnacknowledgedStanzas();
1288            }
1289
1290            queue.start();
1291            Async.go(new Runnable() {
1292                @Override
1293                public void run() {
1294                    writePackets();
1295                }
1296            }, "Smack Packet Writer (" + getConnectionCounter() + ")");
1297        }
1298
1299        private boolean done() {
1300            return shutdownTimestamp != null;
1301        }
1302
1303        protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException {
1304            final boolean done = done();
1305            if (done) {
1306                final boolean smResumptionPossbile = isSmResumptionPossible();
1307                // Don't throw a NotConnectedException is there is an resumable stream available
1308                if (!smResumptionPossbile) {
1309                    throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done
1310                                    + " smResumptionPossible=" + smResumptionPossbile);
1311                }
1312            }
1313        }
1314
1315        /**
1316         * Sends the specified element to the server.
1317         *
1318         * @param element the element to send.
1319         * @throws NotConnectedException 
1320         * @throws InterruptedException 
1321         */
1322        protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException {
1323            throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1324            try {
1325                queue.put(element);
1326            }
1327            catch (InterruptedException e) {
1328                // put() may throw an InterruptedException for two reasons:
1329                // 1. If the queue was shut down
1330                // 2. If the thread was interrupted
1331                // so we have to check which is the case
1332                throwNotConnectedExceptionIfDoneAndResumptionNotPossible();
1333                // If the method above did not throw, then the sending thread was interrupted
1334                throw e;
1335            }
1336        }
1337
1338        /**
1339         * Shuts down the stanza(/packet) writer. Once this method has been called, no further
1340         * packets will be written to the server.
1341         * @throws InterruptedException 
1342         */
1343        void shutdown(boolean instant) {
1344            instantShutdown = instant;
1345            queue.shutdown();
1346            shutdownTimestamp = System.currentTimeMillis();
1347            try {
1348                shutdownDone.checkIfSuccessOrWait();
1349            }
1350            catch (NoResponseException | InterruptedException e) {
1351                LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e);
1352            }
1353        }
1354
1355        /**
1356         * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a
1357         * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in
1358         * that case.
1359         *
1360         * @return the next element for writing or null.
1361         */
1362        private Element nextStreamElement() {
1363            // It is important the we check if the queue is empty before removing an element from it
1364            if (queue.isEmpty()) {
1365                shouldBundleAndDefer = true;
1366            }
1367            Element packet = null;
1368            try {
1369                packet = queue.take();
1370            }
1371            catch (InterruptedException e) {
1372                if (!queue.isShutdown()) {
1373                    // Users shouldn't try to interrupt the packet writer thread
1374                    LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e);
1375                }
1376            }
1377            return packet;
1378        }
1379
1380        private void writePackets() {
1381            Exception writerException = null;
1382            try {
1383                openStream();
1384                initalOpenStreamSend.reportSuccess();
1385                // Write out packets from the queue.
1386                while (!done()) {
1387                    Element element = nextStreamElement();
1388                    if (element == null) {
1389                        continue;
1390                    }
1391
1392                    // Get a local version of the bundle and defer callback, in case it's unset
1393                    // between the null check and the method invocation
1394                    final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback;
1395                    // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is
1396                    // empty), then we could wait a bit for further stanzas attempting to decrease
1397                    // our energy consumption
1398                    if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) {
1399                        // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the
1400                        // queue is empty again.
1401                        shouldBundleAndDefer = false;
1402                        final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean();
1403                        final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer(
1404                                        bundlingAndDeferringStopped));
1405                        if (bundleAndDeferMillis > 0) {
1406                            long remainingWait = bundleAndDeferMillis;
1407                            final long waitStart = System.currentTimeMillis();
1408                            synchronized (bundlingAndDeferringStopped) {
1409                                while (!bundlingAndDeferringStopped.get() && remainingWait > 0) {
1410                                    bundlingAndDeferringStopped.wait(remainingWait);
1411                                    remainingWait = bundleAndDeferMillis
1412                                                    - (System.currentTimeMillis() - waitStart);
1413                                }
1414                            }
1415                        }
1416                    }
1417
1418                    Stanza packet = null;
1419                    if (element instanceof Stanza) {
1420                        packet = (Stanza) element;
1421                    }
1422                    else if (element instanceof Enable) {
1423                        // The client needs to add messages to the unacknowledged stanzas queue
1424                        // right after it sent 'enabled'. Stanza will be added once
1425                        // unacknowledgedStanzas is not null.
1426                        unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE);
1427                    }
1428                    maybeAddToUnacknowledgedStanzas(packet);
1429
1430                    CharSequence elementXml = element.toXML();
1431                    if (elementXml instanceof XmlStringBuilder) {
1432                        ((XmlStringBuilder) elementXml).write(writer);
1433                    }
1434                    else {
1435                        writer.write(elementXml.toString());
1436                    }
1437
1438                    if (queue.isEmpty()) {
1439                        writer.flush();
1440                    }
1441                    if (packet != null) {
1442                        firePacketSendingListeners(packet);
1443                    }
1444                }
1445                if (!instantShutdown) {
1446                    // Flush out the rest of the queue.
1447                    try {
1448                        while (!queue.isEmpty()) {
1449                            Element packet = queue.remove();
1450                            if (packet instanceof Stanza) {
1451                                Stanza stanza = (Stanza) packet;
1452                                maybeAddToUnacknowledgedStanzas(stanza);
1453                            }
1454                            writer.write(packet.toXML().toString());
1455                        }
1456                        writer.flush();
1457                    }
1458                    catch (Exception e) {
1459                        LOGGER.log(Level.WARNING,
1460                                        "Exception flushing queue during shutdown, ignore and continue",
1461                                        e);
1462                    }
1463
1464                    // Close the stream.
1465                    try {
1466                        writer.write("</stream:stream>");
1467                        writer.flush();
1468                    }
1469                    catch (Exception e) {
1470                        LOGGER.log(Level.WARNING, "Exception writing closing stream element", e);
1471                    }
1472
1473                    // Delete the queue contents (hopefully nothing is left).
1474                    queue.clear();
1475                } else if (instantShutdown && isSmEnabled()) {
1476                    // This was an instantShutdown and SM is enabled, drain all remaining stanzas
1477                    // into the unacknowledgedStanzas queue
1478                    drainWriterQueueToUnacknowledgedStanzas();
1479                }
1480                // Do *not* close the writer here, as it will cause the socket
1481                // to get closed. But we may want to receive further stanzas
1482                // until the closing stream tag is received. The socket will be
1483                // closed in shutdown().
1484            }
1485            catch (Exception e) {
1486                // The exception can be ignored if the the connection is 'done'
1487                // or if the it was caused because the socket got closed
1488                if (!(done() || queue.isShutdown())) {
1489                    writerException = e;
1490                } else {
1491                    LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e);
1492                }
1493            } finally {
1494                LOGGER.fine("Reporting shutdownDone success in writer thread");
1495                shutdownDone.reportSuccess();
1496            }
1497            // Delay notifyConnectionError after shutdownDone has been reported in the finally block.
1498            if (writerException != null) {
1499                notifyConnectionError(writerException);
1500            }
1501        }
1502
1503        private void drainWriterQueueToUnacknowledgedStanzas() {
1504            List<Element> elements = new ArrayList<Element>(queue.size());
1505            queue.drainTo(elements);
1506            for (Element element : elements) {
1507                if (element instanceof Stanza) {
1508                    unacknowledgedStanzas.add((Stanza) element);
1509                }
1510            }
1511        }
1512
1513        private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException {
1514            // Check if the stream element should be put to the unacknowledgedStanza
1515            // queue. Note that we can not do the put() in sendStanzaInternal() and the
1516            // packet order is not stable at this point (sendStanzaInternal() can be
1517            // called concurrently).
1518            if (unacknowledgedStanzas != null && stanza != null) {
1519                // If the unacknowledgedStanza queue is nearly full, request an new ack
1520                // from the server in order to drain it
1521                if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) {
1522                    writer.write(AckRequest.INSTANCE.toXML().toString());
1523                    writer.flush();
1524                }
1525                try {
1526                    // It is important the we put the stanza in the unacknowledged stanza
1527                    // queue before we put it on the wire
1528                    unacknowledgedStanzas.put(stanza);
1529                }
1530                catch (InterruptedException e) {
1531                    throw new IllegalStateException(e);
1532                }
1533            }
1534        }
1535    }
1536
1537    /**
1538     * Set if Stream Management should be used by default for new connections.
1539     * 
1540     * @param useSmDefault true to use Stream Management for new connections.
1541     */
1542    public static void setUseStreamManagementDefault(boolean useSmDefault) {
1543        XMPPTCPConnection.useSmDefault = useSmDefault;
1544    }
1545
1546    /**
1547     * Set if Stream Management resumption should be used by default for new connections.
1548     * 
1549     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1550     * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead.
1551     */
1552    @Deprecated
1553    public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) {
1554        setUseStreamManagementResumptionDefault(useSmResumptionDefault);
1555    }
1556
1557    /**
1558     * Set if Stream Management resumption should be used by default for new connections.
1559     *
1560     * @param useSmResumptionDefault true to use Stream Management resumption for new connections.
1561     */
1562    public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) {
1563        if (useSmResumptionDefault) {
1564            // Also enable SM is resumption is enabled
1565            setUseStreamManagementDefault(useSmResumptionDefault);
1566        }
1567        XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault;
1568    }
1569
1570    /**
1571     * Set if Stream Management should be used if supported by the server.
1572     * 
1573     * @param useSm true to use Stream Management.
1574     */
1575    public void setUseStreamManagement(boolean useSm) {
1576        this.useSm = useSm;
1577    }
1578
1579    /**
1580     * Set if Stream Management resumption should be used if supported by the server.
1581     *
1582     * @param useSmResumption true to use Stream Management resumption.
1583     */
1584    public void setUseStreamManagementResumption(boolean useSmResumption) {
1585        if (useSmResumption) {
1586            // Also enable SM is resumption is enabled
1587            setUseStreamManagement(useSmResumption);
1588        }
1589        this.useSmResumption = useSmResumption;
1590    }
1591
1592    /**
1593     * Set the preferred resumption time in seconds.
1594     * @param resumptionTime the preferred resumption time in seconds
1595     */
1596    public void setPreferredResumptionTime(int resumptionTime) {
1597        smClientMaxResumptionTime = resumptionTime;
1598    }
1599
1600    /**
1601     * Add a predicate for Stream Management acknowledgment requests.
1602     * <p>
1603     * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server.
1604     * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package.
1605     * </p>
1606     * <p>
1607     * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used.
1608     * </p>
1609     * 
1610     * @param predicate the predicate to add.
1611     * @return if the predicate was not already active.
1612     */
1613    public boolean addRequestAckPredicate(StanzaFilter predicate) {
1614        synchronized (requestAckPredicates) {
1615            return requestAckPredicates.add(predicate);
1616        }
1617    }
1618
1619    /**
1620     * Remove the given predicate for Stream Management acknowledgment request.
1621     * @param predicate the predicate to remove.
1622     * @return true if the predicate was removed.
1623     */
1624    public boolean removeRequestAckPredicate(StanzaFilter predicate) {
1625        synchronized (requestAckPredicates) {
1626            return requestAckPredicates.remove(predicate);
1627        }
1628    }
1629
1630    /**
1631     * Remove all predicates for Stream Management acknowledgment requests.
1632     */
1633    public void removeAllRequestAckPredicates() {
1634        synchronized (requestAckPredicates) {
1635            requestAckPredicates.clear();
1636        }
1637    }
1638
1639    /**
1640     * Send an unconditional Stream Management acknowledgement request to the server.
1641     *
1642     * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled.
1643     * @throws NotConnectedException if the connection is not connected.
1644     * @throws InterruptedException 
1645     */
1646    public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1647        if (!isSmEnabled()) {
1648            throw new StreamManagementException.StreamManagementNotEnabledException();
1649        }
1650        requestSmAcknowledgementInternal();
1651    }
1652
1653    private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1654        packetWriter.sendStreamElement(AckRequest.INSTANCE);
1655    }
1656
1657    /**
1658     * Send a unconditional Stream Management acknowledgment to the server.
1659     * <p>
1660     * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>:
1661     * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas,
1662     * or after a certain period of time), even if it has not received an <r/> element from the other party."
1663     * </p>
1664     * 
1665     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1666     * @throws NotConnectedException if the connection is not connected.
1667     * @throws InterruptedException 
1668     */
1669    public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException {
1670        if (!isSmEnabled()) {
1671            throw new StreamManagementException.StreamManagementNotEnabledException();
1672        }
1673        sendSmAcknowledgementInternal();
1674    }
1675
1676    private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException {
1677        packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount));
1678    }
1679
1680    /**
1681     * Add a Stanza acknowledged listener.
1682     * <p>
1683     * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get
1684     * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when
1685     * possible.
1686     * </p>
1687     * 
1688     * @param listener the listener to add.
1689     */
1690    public void addStanzaAcknowledgedListener(StanzaListener listener) {
1691        stanzaAcknowledgedListeners.add(listener);
1692    }
1693
1694    /**
1695     * Remove the given Stanza acknowledged listener.
1696     *
1697     * @param listener the listener.
1698     * @return true if the listener was removed.
1699     */
1700    public boolean removeStanzaAcknowledgedListener(StanzaListener listener) {
1701        return stanzaAcknowledgedListeners.remove(listener);
1702    }
1703
1704    /**
1705     * Remove all stanza acknowledged listeners.
1706     */
1707    public void removeAllStanzaAcknowledgedListeners() {
1708        stanzaAcknowledgedListeners.clear();
1709    }
1710
1711    /**
1712     * Add a new Stanza ID acknowledged listener for the given ID.
1713     * <p>
1714     * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will
1715     * automatically be removed after the listener was run.
1716     * </p>
1717     * 
1718     * @param id the stanza ID.
1719     * @param listener the listener to invoke.
1720     * @return the previous listener for this stanza ID or null.
1721     * @throws StreamManagementNotEnabledException if Stream Management is not enabled.
1722     */
1723    public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException {
1724        // Prevent users from adding callbacks that will never get removed
1725        if (!smWasEnabledAtLeastOnce) {
1726            throw new StreamManagementException.StreamManagementNotEnabledException();
1727        }
1728        // Remove the listener after max. 12 hours
1729        final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60);
1730        schedule(new Runnable() {
1731            @Override
1732            public void run() {
1733                stanzaIdAcknowledgedListeners.remove(id);
1734            }
1735        }, removeAfterSeconds, TimeUnit.SECONDS);
1736        return stanzaIdAcknowledgedListeners.put(id, listener);
1737    }
1738
1739    /**
1740     * Remove the Stanza ID acknowledged listener for the given ID.
1741     * 
1742     * @param id the stanza ID.
1743     * @return true if the listener was found and removed, false otherwise.
1744     */
1745    public StanzaListener removeStanzaIdAcknowledgedListener(String id) {
1746        return stanzaIdAcknowledgedListeners.remove(id);
1747    }
1748
1749    /**
1750     * Removes all Stanza ID acknowledged listeners.
1751     */
1752    public void removeAllStanzaIdAcknowledgedListeners() {
1753        stanzaIdAcknowledgedListeners.clear();
1754    }
1755
1756    /**
1757     * Returns true if Stream Management is supported by the server.
1758     *
1759     * @return true if Stream Management is supported by the server.
1760     */
1761    public boolean isSmAvailable() {
1762        return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE);
1763    }
1764
1765    /**
1766     * Returns true if Stream Management was successfully negotiated with the server.
1767     *
1768     * @return true if Stream Management was negotiated.
1769     */
1770    public boolean isSmEnabled() {
1771        return smEnabledSyncPoint.wasSuccessful();
1772    }
1773
1774    /**
1775     * Returns true if the stream was successfully resumed with help of Stream Management.
1776     * 
1777     * @return true if the stream was resumed.
1778     */
1779    public boolean streamWasResumed() {
1780        return smResumedSyncPoint.wasSuccessful();
1781    }
1782
1783    /**
1784     * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible.
1785     * 
1786     * @return true if disconnected but resumption possible.
1787     */
1788    public boolean isDisconnectedButSmResumptionPossible() {
1789        return disconnectedButResumeable && isSmResumptionPossible();
1790    }
1791
1792    /**
1793     * Returns true if the stream is resumable.
1794     *
1795     * @return true if the stream is resumable.
1796     */
1797    public boolean isSmResumptionPossible() {
1798        // There is no resumable stream available
1799        if (smSessionId == null)
1800            return false;
1801
1802        final Long shutdownTimestamp = packetWriter.shutdownTimestamp;
1803        // Seems like we are already reconnected, report true
1804        if (shutdownTimestamp == null) {
1805            return true;
1806        }
1807
1808        // See if resumption time is over
1809        long current = System.currentTimeMillis();
1810        long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000;
1811        if (current > shutdownTimestamp + maxResumptionMillies) {
1812            // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where
1813            // resumption is possible
1814            return false;
1815        } else {
1816            return true;
1817        }
1818    }
1819
1820    /**
1821     * Drop the stream management state. Sets {@link #smSessionId} and
1822     * {@link #unacknowledgedStanzas} to <code>null</code>.
1823     */
1824    private void dropSmState() {
1825        // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/>
1826        // respective. No need to reset them here.
1827        smSessionId = null;
1828        unacknowledgedStanzas = null;
1829    }
1830
1831    /**
1832     * Get the maximum resumption time in seconds after which a managed stream can be resumed.
1833     * <p>
1834     * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum
1835     * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it
1836     * without checking for overflows before.
1837     * </p>
1838     *
1839     * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set.
1840     */
1841    public int getMaxSmResumptionTime() {
1842        int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE;
1843        int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE;
1844        return Math.min(clientResumptionTime, serverResumptionTime);
1845    }
1846
1847    private void processHandledCount(long handledCount) throws StreamManagementCounterError {
1848        long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount);
1849        final List<Stanza> ackedStanzas = new ArrayList<Stanza>(
1850                        ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount
1851                                        : Integer.MAX_VALUE);
1852        for (long i = 0; i < ackedStanzasCount; i++) {
1853            Stanza ackedStanza = unacknowledgedStanzas.poll();
1854            // If the server ack'ed a stanza, then it must be in the
1855            // unacknowledged stanza queue. There can be no exception.
1856            if (ackedStanza == null) {
1857                throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount,
1858                                ackedStanzasCount, ackedStanzas);
1859            }
1860            ackedStanzas.add(ackedStanza);
1861        }
1862
1863        boolean atLeastOneStanzaAcknowledgedListener = false;
1864        if (!stanzaAcknowledgedListeners.isEmpty()) {
1865            // If stanzaAcknowledgedListeners is not empty, the we have at least one
1866            atLeastOneStanzaAcknowledgedListener = true;
1867        }
1868        else {
1869            // Otherwise we look for a matching id in the stanza *id* acknowledged listeners
1870            for (Stanza ackedStanza : ackedStanzas) {
1871                String id = ackedStanza.getStanzaId();
1872                if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) {
1873                    atLeastOneStanzaAcknowledgedListener = true;
1874                    break;
1875                }
1876            }
1877        }
1878
1879        // Only spawn a new thread if there is a chance that some listener is invoked
1880        if (atLeastOneStanzaAcknowledgedListener) {
1881            asyncGo(new Runnable() {
1882                @Override
1883                public void run() {
1884                    for (Stanza ackedStanza : ackedStanzas) {
1885                        for (StanzaListener listener : stanzaAcknowledgedListeners) {
1886                            try {
1887                                listener.processStanza(ackedStanza);
1888                            }
1889                            catch (InterruptedException | NotConnectedException e) {
1890                                LOGGER.log(Level.FINER, "Received exception", e);
1891                            }
1892                        }
1893                        String id = ackedStanza.getStanzaId();
1894                        if (StringUtils.isNullOrEmpty(id)) {
1895                            continue;
1896                        }
1897                        StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id);
1898                        if (listener != null) {
1899                            try {
1900                                listener.processStanza(ackedStanza);
1901                            }
1902                            catch (InterruptedException | NotConnectedException e) {
1903                                LOGGER.log(Level.FINER, "Received exception", e);
1904                            }
1905                        }
1906                    }
1907                }
1908            });
1909        }
1910
1911        serverHandledStanzasCount = handledCount;
1912    }
1913
1914    /**
1915     * Set the default bundle and defer callback used for new connections.
1916     *
1917     * @param defaultBundleAndDeferCallback
1918     * @see BundleAndDeferCallback
1919     * @since 4.1
1920     */
1921    public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) {
1922        XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback;
1923    }
1924
1925    /**
1926     * Set the bundle and defer callback used for this connection.
1927     * <p>
1928     * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then
1929     * no longer get deferred.
1930     * </p>
1931     *
1932     * @param bundleAndDeferCallback the callback or <code>null</code>.
1933     * @see BundleAndDeferCallback
1934     * @since 4.1
1935     */
1936    public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) {
1937        this.bundleAndDeferCallback = bundleAndDeferCallback;
1938    }
1939
1940}