001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.ByteArrayInputStream; 021import java.io.FileInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.OutputStream; 026import java.io.OutputStreamWriter; 027import java.io.Writer; 028import java.lang.reflect.Constructor; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.net.Socket; 032import java.security.KeyManagementException; 033import java.security.KeyStore; 034import java.security.KeyStoreException; 035import java.security.NoSuchAlgorithmException; 036import java.security.NoSuchProviderException; 037import java.security.Provider; 038import java.security.SecureRandom; 039import java.security.Security; 040import java.security.UnrecoverableKeyException; 041import java.security.cert.CertificateException; 042import java.util.ArrayList; 043import java.util.Collection; 044import java.util.Iterator; 045import java.util.LinkedHashSet; 046import java.util.LinkedList; 047import java.util.List; 048import java.util.Map; 049import java.util.Set; 050import java.util.concurrent.ArrayBlockingQueue; 051import java.util.concurrent.BlockingQueue; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentLinkedQueue; 054import java.util.concurrent.TimeUnit; 055import java.util.concurrent.atomic.AtomicBoolean; 056import java.util.logging.Level; 057import java.util.logging.Logger; 058 059import javax.net.SocketFactory; 060import javax.net.ssl.HostnameVerifier; 061import javax.net.ssl.KeyManager; 062import javax.net.ssl.KeyManagerFactory; 063import javax.net.ssl.SSLContext; 064import javax.net.ssl.SSLSession; 065import javax.net.ssl.SSLSocket; 066import javax.net.ssl.TrustManager; 067import javax.net.ssl.X509TrustManager; 068import javax.security.auth.callback.Callback; 069import javax.security.auth.callback.CallbackHandler; 070import javax.security.auth.callback.PasswordCallback; 071 072import org.jivesoftware.smack.AbstractConnectionListener; 073import org.jivesoftware.smack.AbstractXMPPConnection; 074import org.jivesoftware.smack.ConnectionConfiguration; 075import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode; 076import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 077import org.jivesoftware.smack.SmackConfiguration; 078import org.jivesoftware.smack.SmackException; 079import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 080import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 081import org.jivesoftware.smack.SmackException.ConnectionException; 082import org.jivesoftware.smack.SmackException.NoResponseException; 083import org.jivesoftware.smack.SmackException.NotConnectedException; 084import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 085import org.jivesoftware.smack.StanzaListener; 086import org.jivesoftware.smack.SynchronizationPoint; 087import org.jivesoftware.smack.XMPPConnection; 088import org.jivesoftware.smack.XMPPException; 089import org.jivesoftware.smack.XMPPException.FailedNonzaException; 090import org.jivesoftware.smack.XMPPException.StreamErrorException; 091import org.jivesoftware.smack.compress.packet.Compress; 092import org.jivesoftware.smack.compress.packet.Compressed; 093import org.jivesoftware.smack.compression.XMPPInputOutputStream; 094import org.jivesoftware.smack.filter.StanzaFilter; 095import org.jivesoftware.smack.packet.Element; 096import org.jivesoftware.smack.packet.IQ; 097import org.jivesoftware.smack.packet.Message; 098import org.jivesoftware.smack.packet.Nonza; 099import org.jivesoftware.smack.packet.Presence; 100import org.jivesoftware.smack.packet.Stanza; 101import org.jivesoftware.smack.packet.StartTls; 102import org.jivesoftware.smack.packet.StreamError; 103import org.jivesoftware.smack.packet.StreamOpen; 104import org.jivesoftware.smack.proxy.ProxyInfo; 105import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 106import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 107import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 108import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 109import org.jivesoftware.smack.sm.SMUtils; 110import org.jivesoftware.smack.sm.StreamManagementException; 111import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 112import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 113import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 114import org.jivesoftware.smack.sm.packet.StreamManagement; 115import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 116import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 117import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 118import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 119import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 120import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 121import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 122import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 123import org.jivesoftware.smack.sm.predicates.Predicate; 124import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 125import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 126import org.jivesoftware.smack.util.Async; 127import org.jivesoftware.smack.util.DNSUtil; 128import org.jivesoftware.smack.util.PacketParserUtils; 129import org.jivesoftware.smack.util.StringUtils; 130import org.jivesoftware.smack.util.TLSUtils; 131import org.jivesoftware.smack.util.XmlStringBuilder; 132import org.jivesoftware.smack.util.dns.HostAddress; 133import org.jivesoftware.smack.util.dns.SmackDaneProvider; 134import org.jivesoftware.smack.util.dns.SmackDaneVerifier; 135 136import org.jxmpp.jid.impl.JidCreate; 137import org.jxmpp.jid.parts.Resourcepart; 138import org.jxmpp.stringprep.XmppStringprepException; 139import org.jxmpp.util.XmppStringUtils; 140import org.xmlpull.v1.XmlPullParser; 141import org.xmlpull.v1.XmlPullParserException; 142 143/** 144 * Creates a socket connection to an XMPP server. This is the default connection 145 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 146 * 147 * @see XMPPConnection 148 * @author Matt Tucker 149 */ 150public class XMPPTCPConnection extends AbstractXMPPConnection { 151 152 private static final int QUEUE_SIZE = 500; 153 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 154 155 /** 156 * The socket which is used for this connection. 157 */ 158 private Socket socket; 159 160 /** 161 * 162 */ 163 private boolean disconnectedButResumeable = false; 164 165 private SSLSocket secureSocket; 166 167 /** 168 * Protected access level because of unit test purposes 169 */ 170 protected PacketWriter packetWriter; 171 172 /** 173 * Protected access level because of unit test purposes 174 */ 175 protected PacketReader packetReader; 176 177 private final SynchronizationPoint<Exception> initalOpenStreamSend = new SynchronizationPoint<>( 178 this, "initial open stream element send to server"); 179 180 /** 181 * 182 */ 183 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 184 this, "stream compression feature"); 185 186 /** 187 * 188 */ 189 private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>( 190 this, "stream compression"); 191 192 /** 193 * A synchronization point which is successful if this connection has received the closing 194 * stream element from the remote end-point, i.e. the server. 195 */ 196 private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>( 197 this, "stream closing element received"); 198 199 /** 200 * The default bundle and defer callback, used for new connections. 201 * @see bundleAndDeferCallback 202 */ 203 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 204 205 /** 206 * The used bundle and defer callback. 207 * <p> 208 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 209 * having a 'volatile' read within the writer threads loop. 210 * </p> 211 */ 212 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 213 214 private static boolean useSmDefault = true; 215 216 private static boolean useSmResumptionDefault = true; 217 218 /** 219 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 220 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 221 * {@link #unacknowledgedStanzas}. 222 */ 223 private String smSessionId; 224 225 private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>( 226 this, "stream resumed element"); 227 228 private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>( 229 this, "stream enabled element"); 230 231 /** 232 * The client's preferred maximum resumption time in seconds. 233 */ 234 private int smClientMaxResumptionTime = -1; 235 236 /** 237 * The server's preferred maximum resumption time in seconds. 238 */ 239 private int smServerMaxResumptimTime = -1; 240 241 /** 242 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 243 */ 244 private boolean useSm = useSmDefault; 245 private boolean useSmResumption = useSmResumptionDefault; 246 247 /** 248 * The counter that the server sends the client about it's current height. For example, if the server sends 249 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 250 */ 251 private long serverHandledStanzasCount = 0; 252 253 /** 254 * The counter for stanzas handled ("received") by the client. 255 * <p> 256 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 257 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 258 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 259 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 260 * </p> 261 */ 262 private long clientHandledStanzasCount = 0; 263 264 private BlockingQueue<Stanza> unacknowledgedStanzas; 265 266 /** 267 * Set to true if Stream Management was at least once enabled for this connection. 268 */ 269 private boolean smWasEnabledAtLeastOnce = false; 270 271 /** 272 * This listeners are invoked for every stanza that got acknowledged. 273 * <p> 274 * We use a {@link ConccurrentLinkedQueue} here in order to allow the listeners to remove 275 * themselves after they have been invoked. 276 * </p> 277 */ 278 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<StanzaListener>(); 279 280 /** 281 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 282 * only be invoked once and automatically removed after that. 283 */ 284 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<String, StanzaListener>(); 285 286 /** 287 * Predicates that determine if an stream management ack should be requested from the server. 288 * <p> 289 * We use a linked hash set here, so that the order how the predicates are added matches the 290 * order in which they are invoked in order to determine if an ack request should be send or not. 291 * </p> 292 */ 293 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<StanzaFilter>(); 294 295 private final XMPPTCPConnectionConfiguration config; 296 297 /** 298 * Creates a new XMPP connection over TCP (optionally using proxies). 299 * <p> 300 * Note that XMPPTCPConnection constructors do not establish a connection to the server 301 * and you must call {@link #connect()}. 302 * </p> 303 * 304 * @param config the connection configuration. 305 */ 306 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 307 super(config); 308 this.config = config; 309 addConnectionListener(new AbstractConnectionListener() { 310 @Override 311 public void connectionClosedOnError(Exception e) { 312 if (e instanceof XMPPException.StreamErrorException) { 313 dropSmState(); 314 } 315 } 316 }); 317 } 318 319 /** 320 * Creates a new XMPP connection over TCP. 321 * <p> 322 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 323 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 324 * constructor. 325 * </p> 326 * 327 * @param jid the bare JID used by the client. 328 * @param password the password or authentication token. 329 * @throws XmppStringprepException 330 */ 331 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 332 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 333 } 334 335 /** 336 * Creates a new XMPP connection over TCP. 337 * <p> 338 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 339 * you can get fine-grained control over connection settings using the 340 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 341 * </p> 342 * @param username 343 * @param password 344 * @param serviceName 345 * @throws XmppStringprepException 346 */ 347 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 348 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 349 JidCreate.domainBareFrom(serviceName)).build()); 350 } 351 352 @Override 353 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 354 if (packetWriter == null) { 355 throw new NotConnectedException(); 356 } 357 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 358 } 359 360 @Override 361 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 362 if (isConnected() && !disconnectedButResumeable) { 363 throw new AlreadyConnectedException(); 364 } 365 } 366 367 @Override 368 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 369 if (isAuthenticated() && !disconnectedButResumeable) { 370 throw new AlreadyLoggedInException(); 371 } 372 } 373 374 @Override 375 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 376 // Reset the flag in case it was set 377 disconnectedButResumeable = false; 378 super.afterSuccessfulLogin(resumed); 379 } 380 381 @Override 382 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 383 SmackException, IOException, InterruptedException { 384 // Authenticate using SASL 385 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 386 saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession); 387 388 // If compression is enabled then request the server to use stream compression. XEP-170 389 // recommends to perform stream compression before resource binding. 390 maybeEnableCompression(); 391 392 if (isSmResumptionPossible()) { 393 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 394 if (smResumedSyncPoint.wasSuccessful()) { 395 // We successfully resumed the stream, be done here 396 afterSuccessfulLogin(true); 397 return; 398 } 399 // SM resumption failed, what Smack does here is to report success of 400 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 401 // normal resource binding can be tried. 402 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 403 } 404 405 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 406 if (unacknowledgedStanzas != null) { 407 // There was a previous connection with SM enabled but that was either not resumable or 408 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 409 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 410 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 411 // XMPP session (There maybe was an enabled in a previous XMPP session of this 412 // connection instance though). This is used in writePackets to decide if stanzas should 413 // be added to the unacknowledged stanzas queue, because they have to be added right 414 // after the 'enable' stream element has been sent. 415 dropSmState(); 416 } 417 418 // Now bind the resource. It is important to do this *after* we dropped an eventually 419 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 420 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 421 bindResourceAndEstablishSession(resource); 422 423 if (isSmAvailable() && useSm) { 424 // Remove what is maybe left from previously stream managed sessions 425 serverHandledStanzasCount = 0; 426 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 427 // then this is a non recoverable error and we therefore throw an exception. 428 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 429 synchronized (requestAckPredicates) { 430 if (requestAckPredicates.isEmpty()) { 431 // Assure that we have at lest one predicate set up that so that we request acks 432 // for the server and eventually flush some stanzas from the unacknowledged 433 // stanza queue 434 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 435 } 436 } 437 } 438 // (Re-)send the stanzas *after* we tried to enable SM 439 for (Stanza stanza : previouslyUnackedStanzas) { 440 sendStanzaInternal(stanza); 441 } 442 443 afterSuccessfulLogin(false); 444 } 445 446 @Override 447 public boolean isSecureConnection() { 448 return secureSocket != null; 449 } 450 451 /** 452 * Shuts the current connection down. After this method returns, the connection must be ready 453 * for re-use by connect. 454 */ 455 @Override 456 protected void shutdown() { 457 if (isSmEnabled()) { 458 try { 459 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 460 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 461 sendSmAcknowledgementInternal(); 462 } catch (InterruptedException | NotConnectedException e) { 463 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 464 } 465 } 466 shutdown(false); 467 } 468 469 /** 470 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 471 */ 472 public synchronized void instantShutdown() { 473 shutdown(true); 474 } 475 476 private void shutdown(boolean instant) { 477 if (disconnectedButResumeable) { 478 return; 479 } 480 481 // First shutdown the writer, this will result in a closing stream element getting send to 482 // the server 483 if (packetWriter != null) { 484 LOGGER.finer("PacketWriter shutdown()"); 485 packetWriter.shutdown(instant); 486 } 487 LOGGER.finer("PacketWriter has been shut down"); 488 489 if (!instant) { 490 try { 491 // After we send the closing stream element, check if there was already a 492 // closing stream element sent by the server or wait with a timeout for a 493 // closing stream element to be received from the server. 494 @SuppressWarnings("unused") 495 Exception res = closingStreamReceived.checkIfSuccessOrWait(); 496 } catch (InterruptedException | NoResponseException e) { 497 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 498 } 499 } 500 501 if (packetReader != null) { 502 LOGGER.finer("PacketReader shutdown()"); 503 packetReader.shutdown(); 504 } 505 LOGGER.finer("PacketReader has been shut down"); 506 507 try { 508 socket.close(); 509 } catch (Exception e) { 510 LOGGER.log(Level.WARNING, "shutdown", e); 511 } 512 513 setWasAuthenticated(); 514 // If we are able to resume the stream, then don't set 515 // connected/authenticated/usingTLS to false since we like behave like we are still 516 // connected (e.g. sendStanza should not throw a NotConnectedException). 517 if (isSmResumptionPossible() && instant) { 518 disconnectedButResumeable = true; 519 } else { 520 disconnectedButResumeable = false; 521 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 522 // stream tag, there is no longer a stream to resume. 523 smSessionId = null; 524 } 525 authenticated = false; 526 connected = false; 527 secureSocket = null; 528 reader = null; 529 writer = null; 530 531 maybeCompressFeaturesReceived.init(); 532 compressSyncPoint.init(); 533 smResumedSyncPoint.init(); 534 smEnabledSyncPoint.init(); 535 initalOpenStreamSend.init(); 536 } 537 538 @Override 539 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 540 packetWriter.sendStreamElement(element); 541 } 542 543 @Override 544 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 545 packetWriter.sendStreamElement(packet); 546 if (isSmEnabled()) { 547 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 548 if (requestAckPredicate.accept(packet)) { 549 requestSmAcknowledgementInternal(); 550 break; 551 } 552 } 553 } 554 } 555 556 private void connectUsingConfiguration() throws ConnectionException, IOException { 557 List<HostAddress> failedAddresses = populateHostAddresses(); 558 SocketFactory socketFactory = config.getSocketFactory(); 559 ProxyInfo proxyInfo = config.getProxyInfo(); 560 int timeout = config.getConnectTimeout(); 561 if (socketFactory == null) { 562 socketFactory = SocketFactory.getDefault(); 563 } 564 for (HostAddress hostAddress : hostAddresses) { 565 Iterator<InetAddress> inetAddresses = null; 566 String host = hostAddress.getHost(); 567 int port = hostAddress.getPort(); 568 if (proxyInfo == null) { 569 inetAddresses = hostAddress.getInetAddresses().iterator(); 570 assert (inetAddresses.hasNext()); 571 572 innerloop: while (inetAddresses.hasNext()) { 573 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 574 // re-usable after a failed connection attempt. See also SMACK-724. 575 socket = socketFactory.createSocket(); 576 577 final InetAddress inetAddress = inetAddresses.next(); 578 final String inetAddressAndPort = inetAddress + " at port " + port; 579 LOGGER.finer("Trying to establish TCP connection to " + inetAddressAndPort); 580 try { 581 socket.connect(new InetSocketAddress(inetAddress, port), timeout); 582 } catch (Exception e) { 583 hostAddress.setException(inetAddress, e); 584 if (inetAddresses.hasNext()) { 585 continue innerloop; 586 } else { 587 break innerloop; 588 } 589 } 590 LOGGER.finer("Established TCP connection to " + inetAddressAndPort); 591 // We found a host to connect to, return here 592 this.host = host; 593 this.port = port; 594 return; 595 } 596 failedAddresses.add(hostAddress); 597 } else { 598 socket = socketFactory.createSocket(); 599 StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy"); 600 final String hostAndPort = host + " at port " + port; 601 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 602 try { 603 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 604 } catch (IOException e) { 605 hostAddress.setException(e); 606 continue; 607 } 608 LOGGER.finer("Established TCP connection to " + hostAndPort); 609 // We found a host to connect to, return here 610 this.host = host; 611 this.port = port; 612 return; 613 } 614 } 615 // There are no more host addresses to try 616 // throw an exception and report all tried 617 // HostAddresses in the exception 618 throw ConnectionException.from(failedAddresses); 619 } 620 621 /** 622 * Initializes the connection by creating a stanza(/packet) reader and writer and opening a 623 * XMPP stream to the server. 624 * 625 * @throws XMPPException if establishing a connection to the server fails. 626 * @throws SmackException if the server failes to respond back or if there is anther error. 627 * @throws IOException 628 */ 629 private void initConnection() throws IOException { 630 boolean isFirstInitialization = packetReader == null || packetWriter == null; 631 compressionHandler = null; 632 633 // Set the reader and writer instance variables 634 initReaderAndWriter(); 635 636 if (isFirstInitialization) { 637 packetWriter = new PacketWriter(); 638 packetReader = new PacketReader(); 639 640 // If debugging is enabled, we should start the thread that will listen for 641 // all packets and then log them. 642 if (config.isDebuggerEnabled()) { 643 addAsyncStanzaListener(debugger.getReaderListener(), null); 644 if (debugger.getWriterListener() != null) { 645 addPacketSendingListener(debugger.getWriterListener(), null); 646 } 647 } 648 } 649 // Start the packet writer. This will open an XMPP stream to the server 650 packetWriter.init(); 651 // Start the packet reader. The startup() method will block until we 652 // get an opening stream packet back from server 653 packetReader.init(); 654 } 655 656 private void initReaderAndWriter() throws IOException { 657 InputStream is = socket.getInputStream(); 658 OutputStream os = socket.getOutputStream(); 659 if (compressionHandler != null) { 660 is = compressionHandler.getInputStream(is); 661 os = compressionHandler.getOutputStream(os); 662 } 663 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 664 writer = new OutputStreamWriter(os, "UTF-8"); 665 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 666 667 // If debugging is enabled, we open a window and write out all network traffic. 668 initDebugger(); 669 } 670 671 /** 672 * The server has indicated that TLS negotiation can start. We now need to secure the 673 * existing plain connection and perform a handshake. This method won't return until the 674 * connection has finished the handshake or an error occurred while securing the connection. 675 * @throws IOException 676 * @throws CertificateException 677 * @throws NoSuchAlgorithmException 678 * @throws NoSuchProviderException 679 * @throws KeyStoreException 680 * @throws UnrecoverableKeyException 681 * @throws KeyManagementException 682 * @throws SmackException 683 * @throws Exception if an exception occurs. 684 */ 685 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 686 SSLContext context = this.config.getCustomSSLContext(); 687 KeyStore ks = null; 688 KeyManager[] kms = null; 689 PasswordCallback pcb = null; 690 SmackDaneVerifier daneVerifier = null; 691 692 if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) { 693 SmackDaneProvider daneProvider = DNSUtil.getDaneProvider(); 694 if (daneProvider == null) { 695 throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured"); 696 } 697 daneVerifier = daneProvider.newInstance(); 698 if (daneVerifier == null) { 699 throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier"); 700 } 701 } 702 703 if (context == null) { 704 final String keyStoreType = config.getKeystoreType(); 705 final CallbackHandler callbackHandler = config.getCallbackHandler(); 706 final String keystorePath = config.getKeystorePath(); 707 if ("PKCS11".equals(keyStoreType)) { 708 try { 709 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 710 String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library(); 711 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8)); 712 Provider p = (Provider) c.newInstance(config); 713 Security.addProvider(p); 714 ks = KeyStore.getInstance("PKCS11",p); 715 pcb = new PasswordCallback("PKCS11 Password: ",false); 716 callbackHandler.handle(new Callback[] {pcb}); 717 ks.load(null,pcb.getPassword()); 718 } 719 catch (Exception e) { 720 LOGGER.log(Level.WARNING, "Exception", e); 721 ks = null; 722 } 723 } 724 else if ("Apple".equals(keyStoreType)) { 725 ks = KeyStore.getInstance("KeychainStore","Apple"); 726 ks.load(null,null); 727 //pcb = new PasswordCallback("Apple Keychain",false); 728 //pcb.setPassword(null); 729 } 730 else if (keyStoreType != null) { 731 ks = KeyStore.getInstance(keyStoreType); 732 if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) { 733 try { 734 pcb = new PasswordCallback("Keystore Password: ", false); 735 callbackHandler.handle(new Callback[] { pcb }); 736 ks.load(new FileInputStream(keystorePath), pcb.getPassword()); 737 } 738 catch (Exception e) { 739 LOGGER.log(Level.WARNING, "Exception", e); 740 ks = null; 741 } 742 } else { 743 ks.load(null, null); 744 } 745 } 746 747 if (ks != null) { 748 String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); 749 KeyManagerFactory kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); 750 try { 751 if (pcb == null) { 752 kmf.init(ks, null); 753 } 754 else { 755 kmf.init(ks, pcb.getPassword()); 756 pcb.clearPassword(); 757 } 758 kms = kmf.getKeyManagers(); 759 } 760 catch (NullPointerException npe) { 761 LOGGER.log(Level.WARNING, "NullPointerException", npe); 762 } 763 } 764 765 // If the user didn't specify a SSLContext, use the default one 766 context = SSLContext.getInstance("TLS"); 767 768 final SecureRandom secureRandom = new java.security.SecureRandom(); 769 X509TrustManager customTrustManager = config.getCustomX509TrustManager(); 770 771 if (daneVerifier != null) { 772 // User requested DANE verification. 773 daneVerifier.init(context, kms, customTrustManager, secureRandom); 774 } else { 775 TrustManager[] customTrustManagers = null; 776 if (customTrustManager != null) { 777 customTrustManagers = new TrustManager[] { customTrustManager }; 778 } 779 context.init(kms, customTrustManagers, secureRandom); 780 } 781 } 782 783 Socket plain = socket; 784 // Secure the plain connection 785 socket = context.getSocketFactory().createSocket(plain, 786 host, plain.getPort(), true); 787 788 final SSLSocket sslSocket = (SSLSocket) socket; 789 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 790 // important (at least on certain platforms) and it seems to be a good idea anyways to 791 // prevent an accidental implicit handshake. 792 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 793 794 // Initialize the reader and writer with the new secured version 795 initReaderAndWriter(); 796 797 // Proceed to do the handshake 798 sslSocket.startHandshake(); 799 800 if (daneVerifier != null) { 801 daneVerifier.finish(sslSocket); 802 } 803 804 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 805 if (verifier == null) { 806 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 807 } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) { 808 throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain()); 809 } 810 811 // Set that TLS was successful 812 secureSocket = sslSocket; 813 } 814 815 /** 816 * Returns the compression handler that can be used for one compression methods offered by the server. 817 * 818 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 819 * 820 */ 821 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 822 for (XMPPInputOutputStream handler : SmackConfiguration.getCompresionHandlers()) { 823 String method = handler.getCompressionMethod(); 824 if (compression.getMethods().contains(method)) 825 return handler; 826 } 827 return null; 828 } 829 830 @Override 831 public boolean isUsingCompression() { 832 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 833 } 834 835 /** 836 * <p> 837 * Starts using stream compression that will compress network traffic. Traffic can be 838 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 839 * connection. However, the server and the client will need to use more CPU time in order to 840 * un/compress network data so under high load the server performance might be affected. 841 * </p> 842 * <p> 843 * Stream compression has to have been previously offered by the server. Currently only the 844 * zlib method is supported by the client. Stream compression negotiation has to be done 845 * before authentication took place. 846 * </p> 847 * 848 * @throws NotConnectedException 849 * @throws SmackException 850 * @throws NoResponseException 851 * @throws InterruptedException 852 */ 853 private void maybeEnableCompression() throws NotConnectedException, NoResponseException, SmackException, InterruptedException { 854 if (!config.isCompressionEnabled()) { 855 return; 856 } 857 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 858 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 859 if (compression == null) { 860 // Server does not support compression 861 return; 862 } 863 // If stream compression was offered by the server and we want to use 864 // compression then send compression request to the server 865 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 866 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 867 } else { 868 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 869 } 870 } 871 872 /** 873 * Establishes a connection to the XMPP server. It basically 874 * creates and maintains a socket connection to the server. 875 * <p> 876 * Listeners will be preserved from a previous connection if the reconnection 877 * occurs after an abrupt termination. 878 * </p> 879 * 880 * @throws XMPPException if an error occurs while trying to establish the connection. 881 * @throws SmackException 882 * @throws IOException 883 * @throws InterruptedException 884 */ 885 @Override 886 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 887 closingStreamReceived.init(); 888 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 889 // there is an error establishing the connection 890 connectUsingConfiguration(); 891 892 // We connected successfully to the servers TCP port 893 initConnection(); 894 } 895 896 /** 897 * Sends out a notification that there was an error with the connection 898 * and closes the connection. Also prints the stack trace of the given exception 899 * 900 * @param e the exception that causes the connection close event. 901 */ 902 private synchronized void notifyConnectionError(Exception e) { 903 // Listeners were already notified of the exception, return right here. 904 if ((packetReader == null || packetReader.done) && 905 (packetWriter == null || packetWriter.done())) return; 906 907 // Closes the connection temporary. A reconnection is possible 908 // Note that a connection listener of XMPPTCPConnection will drop the SM state in 909 // case the Exception is a StreamErrorException. 910 instantShutdown(); 911 912 // Notify connection listeners of the error. 913 callConnectionClosedOnErrorListener(e); 914 } 915 916 /** 917 * For unit testing purposes 918 * 919 * @param writer 920 */ 921 protected void setWriter(Writer writer) { 922 this.writer = writer; 923 } 924 925 @Override 926 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException { 927 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 928 if (startTlsFeature != null) { 929 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 930 SmackException smackException = new SecurityRequiredByServerException(); 931 tlsHandled.reportFailure(smackException); 932 notifyConnectionError(smackException); 933 return; 934 } 935 936 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 937 sendNonza(new StartTls()); 938 } else { 939 tlsHandled.reportSuccess(); 940 } 941 } else { 942 tlsHandled.reportSuccess(); 943 } 944 945 if (getSASLAuthentication().authenticationSuccessful()) { 946 // If we have received features after the SASL has been successfully completed, then we 947 // have also *maybe* received, as it is an optional feature, the compression feature 948 // from the server. 949 maybeCompressFeaturesReceived.reportSuccess(); 950 } 951 } 952 953 /** 954 * Resets the parser using the latest connection's reader. Reseting the parser is necessary 955 * when the plain connection has been secured or when a new opening stream element is going 956 * to be sent by the server. 957 * 958 * @throws SmackException if the parser could not be reset. 959 * @throws InterruptedException 960 */ 961 void openStream() throws SmackException, InterruptedException { 962 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 963 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 964 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 965 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 966 CharSequence to = getXMPPServiceDomain(); 967 CharSequence from = null; 968 CharSequence localpart = config.getUsername(); 969 if (localpart != null) { 970 from = XmppStringUtils.completeJidFrom(localpart, to); 971 } 972 String id = getStreamId(); 973 sendNonza(new StreamOpen(to, from, id)); 974 try { 975 packetReader.parser = PacketParserUtils.newXmppParser(reader); 976 } 977 catch (XmlPullParserException e) { 978 throw new SmackException(e); 979 } 980 } 981 982 protected class PacketReader { 983 984 XmlPullParser parser; 985 986 private volatile boolean done; 987 988 /** 989 * Initializes the reader in order to be used. The reader is initialized during the 990 * first connection and when reconnecting due to an abruptly disconnection. 991 */ 992 void init() { 993 done = false; 994 995 Async.go(new Runnable() { 996 @Override 997 public void run() { 998 parsePackets(); 999 } 1000 }, "Smack Packet Reader (" + getConnectionCounter() + ")"); 1001 } 1002 1003 /** 1004 * Shuts the stanza(/packet) reader down. This method simply sets the 'done' flag to true. 1005 */ 1006 void shutdown() { 1007 done = true; 1008 } 1009 1010 /** 1011 * Parse top-level packets in order to process them further. 1012 * 1013 * @param thread the thread that is being used by the reader to parse incoming packets. 1014 */ 1015 private void parsePackets() { 1016 try { 1017 initalOpenStreamSend.checkIfSuccessOrWait(); 1018 int eventType = parser.getEventType(); 1019 while (!done) { 1020 switch (eventType) { 1021 case XmlPullParser.START_TAG: 1022 final String name = parser.getName(); 1023 switch (name) { 1024 case Message.ELEMENT: 1025 case IQ.IQ_ELEMENT: 1026 case Presence.ELEMENT: 1027 try { 1028 parseAndProcessStanza(parser); 1029 } finally { 1030 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 1031 } 1032 break; 1033 case "stream": 1034 // We found an opening stream. 1035 if ("jabber:client".equals(parser.getNamespace(null))) { 1036 streamId = parser.getAttributeValue("", "id"); 1037 String reportedServerDomain = parser.getAttributeValue("", "from"); 1038 assert (config.getXMPPServiceDomain().equals(reportedServerDomain)); 1039 } 1040 break; 1041 case "error": 1042 StreamError streamError = PacketParserUtils.parseStreamError(parser); 1043 saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); 1044 // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync 1045 // point to report the error, which is checked immediately after tlsHandled in 1046 // connectInternal(). 1047 tlsHandled.reportSuccess(); 1048 throw new StreamErrorException(streamError); 1049 case "features": 1050 parseFeatures(parser); 1051 break; 1052 case "proceed": 1053 try { 1054 // Secure the connection by negotiating TLS 1055 proceedTLSReceived(); 1056 // Send a new opening stream to the server 1057 openStream(); 1058 } 1059 catch (Exception e) { 1060 SmackException smackException = new SmackException(e); 1061 tlsHandled.reportFailure(smackException); 1062 throw e; 1063 } 1064 break; 1065 case "failure": 1066 String namespace = parser.getNamespace(null); 1067 switch (namespace) { 1068 case "urn:ietf:params:xml:ns:xmpp-tls": 1069 // TLS negotiation has failed. The server will close the connection 1070 // TODO Parse failure stanza 1071 throw new SmackException("TLS negotiation has failed"); 1072 case "http://jabber.org/protocol/compress": 1073 // Stream compression has been denied. This is a recoverable 1074 // situation. It is still possible to authenticate and 1075 // use the connection but using an uncompressed connection 1076 // TODO Parse failure stanza 1077 compressSyncPoint.reportFailure(new SmackException( 1078 "Could not establish compression")); 1079 break; 1080 case SaslStreamElements.NAMESPACE: 1081 // SASL authentication has failed. The server may close the connection 1082 // depending on the number of retries 1083 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1084 getSASLAuthentication().authenticationFailed(failure); 1085 break; 1086 } 1087 break; 1088 case Challenge.ELEMENT: 1089 // The server is challenging the SASL authentication made by the client 1090 String challengeData = parser.nextText(); 1091 getSASLAuthentication().challengeReceived(challengeData); 1092 break; 1093 case Success.ELEMENT: 1094 Success success = new Success(parser.nextText()); 1095 // We now need to bind a resource for the connection 1096 // Open a new stream and wait for the response 1097 openStream(); 1098 // The SASL authentication with the server was successful. The next step 1099 // will be to bind the resource 1100 getSASLAuthentication().authenticated(success); 1101 break; 1102 case Compressed.ELEMENT: 1103 // Server confirmed that it's possible to use stream compression. Start 1104 // stream compression 1105 // Initialize the reader and writer with the new compressed version 1106 initReaderAndWriter(); 1107 // Send a new opening stream to the server 1108 openStream(); 1109 // Notify that compression is being used 1110 compressSyncPoint.reportSuccess(); 1111 break; 1112 case Enabled.ELEMENT: 1113 Enabled enabled = ParseStreamManagement.enabled(parser); 1114 if (enabled.isResumeSet()) { 1115 smSessionId = enabled.getId(); 1116 if (StringUtils.isNullOrEmpty(smSessionId)) { 1117 SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received"); 1118 smEnabledSyncPoint.reportFailure(xmppException); 1119 throw xmppException; 1120 } 1121 smServerMaxResumptimTime = enabled.getMaxResumptionTime(); 1122 } else { 1123 // Mark this a non-resumable stream by setting smSessionId to null 1124 smSessionId = null; 1125 } 1126 clientHandledStanzasCount = 0; 1127 smWasEnabledAtLeastOnce = true; 1128 smEnabledSyncPoint.reportSuccess(); 1129 LOGGER.fine("Stream Management (XEP-198): succesfully enabled"); 1130 break; 1131 case Failed.ELEMENT: 1132 Failed failed = ParseStreamManagement.failed(parser); 1133 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getXMPPErrorCondition()); 1134 // If only XEP-198 would specify different failure elements for the SM 1135 // enable and SM resume failure case. But this is not the case, so we 1136 // need to determine if this is a 'Failed' response for either 'Enable' 1137 // or 'Resume'. 1138 if (smResumedSyncPoint.requestSent()) { 1139 smResumedSyncPoint.reportFailure(xmppException); 1140 } 1141 else { 1142 if (!smEnabledSyncPoint.requestSent()) { 1143 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1144 } 1145 smEnabledSyncPoint.reportFailure(new SmackException(xmppException)); 1146 // Report success for last lastFeaturesReceived so that in case a 1147 // failed resumption, we can continue with normal resource binding. 1148 // See text of XEP-198 5. below Example 11. 1149 lastFeaturesReceived.reportSuccess(); 1150 } 1151 break; 1152 case Resumed.ELEMENT: 1153 Resumed resumed = ParseStreamManagement.resumed(parser); 1154 if (!smSessionId.equals(resumed.getPrevId())) { 1155 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1156 } 1157 // Mark SM as enabled and resumption as successful. 1158 smResumedSyncPoint.reportSuccess(); 1159 smEnabledSyncPoint.reportSuccess(); 1160 // First, drop the stanzas already handled by the server 1161 processHandledCount(resumed.getHandledCount()); 1162 // Then re-send what is left in the unacknowledged queue 1163 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1164 unacknowledgedStanzas.drainTo(stanzasToResend); 1165 for (Stanza stanza : stanzasToResend) { 1166 sendStanzaInternal(stanza); 1167 } 1168 // If there where stanzas resent, then request a SM ack for them. 1169 // Writer's sendStreamElement() won't do it automatically based on 1170 // predicates. 1171 if (!stanzasToResend.isEmpty()) { 1172 requestSmAcknowledgementInternal(); 1173 } 1174 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1175 break; 1176 case AckAnswer.ELEMENT: 1177 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1178 processHandledCount(ackAnswer.getHandledCount()); 1179 break; 1180 case AckRequest.ELEMENT: 1181 ParseStreamManagement.ackRequest(parser); 1182 if (smEnabledSyncPoint.wasSuccessful()) { 1183 sendSmAcknowledgementInternal(); 1184 } else { 1185 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1186 } 1187 break; 1188 default: 1189 LOGGER.warning("Unknown top level stream element: " + name); 1190 break; 1191 } 1192 break; 1193 case XmlPullParser.END_TAG: 1194 if (parser.getName().equals("stream")) { 1195 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1196 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1197 break; 1198 } 1199 1200 // Check if the queue was already shut down before reporting success on closing stream tag 1201 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1202 // did re-start the queue again, causing this writer to assume that the queue is not 1203 // shutdown, which results in a call to disconnect(). 1204 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1205 closingStreamReceived.reportSuccess(); 1206 1207 if (queueWasShutdown) { 1208 // We received a closing stream element *after* we initiated the 1209 // termination of the session by sending a closing stream element to 1210 // the server first 1211 return; 1212 } else { 1213 // We received a closing stream element from the server without us 1214 // sending a closing stream element first. This means that the 1215 // server wants to terminate the session, therefore disconnect 1216 // the connection 1217 LOGGER.info(XMPPTCPConnection.this 1218 + " received closing </stream> element." 1219 + " Server wants to terminate the connection, calling disconnect()"); 1220 disconnect(); 1221 } 1222 } 1223 break; 1224 case XmlPullParser.END_DOCUMENT: 1225 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1226 // closing stream element before. 1227 throw new SmackException( 1228 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1229 } 1230 eventType = parser.next(); 1231 } 1232 } 1233 catch (Exception e) { 1234 closingStreamReceived.reportFailure(e); 1235 // The exception can be ignored if the the connection is 'done' 1236 // or if the it was caused because the socket got closed 1237 if (!(done || packetWriter.queue.isShutdown())) { 1238 // Close the connection and notify connection listeners of the 1239 // error. 1240 notifyConnectionError(e); 1241 } 1242 } 1243 } 1244 } 1245 1246 protected class PacketWriter { 1247 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1248 1249 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<Element>( 1250 QUEUE_SIZE, true); 1251 1252 /** 1253 * Needs to be protected for unit testing purposes. 1254 */ 1255 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<NoResponseException>( 1256 XMPPTCPConnection.this, "shutdown completed"); 1257 1258 /** 1259 * If set, the stanza(/packet) writer is shut down 1260 */ 1261 protected volatile Long shutdownTimestamp = null; 1262 1263 private volatile boolean instantShutdown; 1264 1265 /** 1266 * True if some preconditions are given to start the bundle and defer mechanism. 1267 * <p> 1268 * This will likely get set to true right after the start of the writer thread, because 1269 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1270 * this field to true. 1271 * </p> 1272 */ 1273 private boolean shouldBundleAndDefer; 1274 1275 /** 1276 * Initializes the writer in order to be used. It is called at the first connection and also 1277 * is invoked if the connection is disconnected by an error. 1278 */ 1279 void init() { 1280 shutdownDone.init(); 1281 shutdownTimestamp = null; 1282 1283 if (unacknowledgedStanzas != null) { 1284 // It's possible that there are new stanzas in the writer queue that 1285 // came in while we were disconnected but resumable, drain those into 1286 // the unacknowledged queue so that they get resent now 1287 drainWriterQueueToUnacknowledgedStanzas(); 1288 } 1289 1290 queue.start(); 1291 Async.go(new Runnable() { 1292 @Override 1293 public void run() { 1294 writePackets(); 1295 } 1296 }, "Smack Packet Writer (" + getConnectionCounter() + ")"); 1297 } 1298 1299 private boolean done() { 1300 return shutdownTimestamp != null; 1301 } 1302 1303 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1304 final boolean done = done(); 1305 if (done) { 1306 final boolean smResumptionPossbile = isSmResumptionPossible(); 1307 // Don't throw a NotConnectedException is there is an resumable stream available 1308 if (!smResumptionPossbile) { 1309 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1310 + " smResumptionPossible=" + smResumptionPossbile); 1311 } 1312 } 1313 } 1314 1315 /** 1316 * Sends the specified element to the server. 1317 * 1318 * @param element the element to send. 1319 * @throws NotConnectedException 1320 * @throws InterruptedException 1321 */ 1322 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1323 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1324 try { 1325 queue.put(element); 1326 } 1327 catch (InterruptedException e) { 1328 // put() may throw an InterruptedException for two reasons: 1329 // 1. If the queue was shut down 1330 // 2. If the thread was interrupted 1331 // so we have to check which is the case 1332 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1333 // If the method above did not throw, then the sending thread was interrupted 1334 throw e; 1335 } 1336 } 1337 1338 /** 1339 * Shuts down the stanza(/packet) writer. Once this method has been called, no further 1340 * packets will be written to the server. 1341 * @throws InterruptedException 1342 */ 1343 void shutdown(boolean instant) { 1344 instantShutdown = instant; 1345 queue.shutdown(); 1346 shutdownTimestamp = System.currentTimeMillis(); 1347 try { 1348 shutdownDone.checkIfSuccessOrWait(); 1349 } 1350 catch (NoResponseException | InterruptedException e) { 1351 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1352 } 1353 } 1354 1355 /** 1356 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1357 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1358 * that case. 1359 * 1360 * @return the next element for writing or null. 1361 */ 1362 private Element nextStreamElement() { 1363 // It is important the we check if the queue is empty before removing an element from it 1364 if (queue.isEmpty()) { 1365 shouldBundleAndDefer = true; 1366 } 1367 Element packet = null; 1368 try { 1369 packet = queue.take(); 1370 } 1371 catch (InterruptedException e) { 1372 if (!queue.isShutdown()) { 1373 // Users shouldn't try to interrupt the packet writer thread 1374 LOGGER.log(Level.WARNING, "Packet writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1375 } 1376 } 1377 return packet; 1378 } 1379 1380 private void writePackets() { 1381 Exception writerException = null; 1382 try { 1383 openStream(); 1384 initalOpenStreamSend.reportSuccess(); 1385 // Write out packets from the queue. 1386 while (!done()) { 1387 Element element = nextStreamElement(); 1388 if (element == null) { 1389 continue; 1390 } 1391 1392 // Get a local version of the bundle and defer callback, in case it's unset 1393 // between the null check and the method invocation 1394 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1395 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1396 // empty), then we could wait a bit for further stanzas attempting to decrease 1397 // our energy consumption 1398 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1399 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1400 // queue is empty again. 1401 shouldBundleAndDefer = false; 1402 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1403 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1404 bundlingAndDeferringStopped)); 1405 if (bundleAndDeferMillis > 0) { 1406 long remainingWait = bundleAndDeferMillis; 1407 final long waitStart = System.currentTimeMillis(); 1408 synchronized (bundlingAndDeferringStopped) { 1409 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1410 bundlingAndDeferringStopped.wait(remainingWait); 1411 remainingWait = bundleAndDeferMillis 1412 - (System.currentTimeMillis() - waitStart); 1413 } 1414 } 1415 } 1416 } 1417 1418 Stanza packet = null; 1419 if (element instanceof Stanza) { 1420 packet = (Stanza) element; 1421 } 1422 else if (element instanceof Enable) { 1423 // The client needs to add messages to the unacknowledged stanzas queue 1424 // right after it sent 'enabled'. Stanza will be added once 1425 // unacknowledgedStanzas is not null. 1426 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1427 } 1428 maybeAddToUnacknowledgedStanzas(packet); 1429 1430 CharSequence elementXml = element.toXML(); 1431 if (elementXml instanceof XmlStringBuilder) { 1432 ((XmlStringBuilder) elementXml).write(writer); 1433 } 1434 else { 1435 writer.write(elementXml.toString()); 1436 } 1437 1438 if (queue.isEmpty()) { 1439 writer.flush(); 1440 } 1441 if (packet != null) { 1442 firePacketSendingListeners(packet); 1443 } 1444 } 1445 if (!instantShutdown) { 1446 // Flush out the rest of the queue. 1447 try { 1448 while (!queue.isEmpty()) { 1449 Element packet = queue.remove(); 1450 if (packet instanceof Stanza) { 1451 Stanza stanza = (Stanza) packet; 1452 maybeAddToUnacknowledgedStanzas(stanza); 1453 } 1454 writer.write(packet.toXML().toString()); 1455 } 1456 writer.flush(); 1457 } 1458 catch (Exception e) { 1459 LOGGER.log(Level.WARNING, 1460 "Exception flushing queue during shutdown, ignore and continue", 1461 e); 1462 } 1463 1464 // Close the stream. 1465 try { 1466 writer.write("</stream:stream>"); 1467 writer.flush(); 1468 } 1469 catch (Exception e) { 1470 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1471 } 1472 1473 // Delete the queue contents (hopefully nothing is left). 1474 queue.clear(); 1475 } else if (instantShutdown && isSmEnabled()) { 1476 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1477 // into the unacknowledgedStanzas queue 1478 drainWriterQueueToUnacknowledgedStanzas(); 1479 } 1480 // Do *not* close the writer here, as it will cause the socket 1481 // to get closed. But we may want to receive further stanzas 1482 // until the closing stream tag is received. The socket will be 1483 // closed in shutdown(). 1484 } 1485 catch (Exception e) { 1486 // The exception can be ignored if the the connection is 'done' 1487 // or if the it was caused because the socket got closed 1488 if (!(done() || queue.isShutdown())) { 1489 writerException = e; 1490 } else { 1491 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1492 } 1493 } finally { 1494 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1495 shutdownDone.reportSuccess(); 1496 } 1497 // Delay notifyConnectionError after shutdownDone has been reported in the finally block. 1498 if (writerException != null) { 1499 notifyConnectionError(writerException); 1500 } 1501 } 1502 1503 private void drainWriterQueueToUnacknowledgedStanzas() { 1504 List<Element> elements = new ArrayList<Element>(queue.size()); 1505 queue.drainTo(elements); 1506 for (Element element : elements) { 1507 if (element instanceof Stanza) { 1508 unacknowledgedStanzas.add((Stanza) element); 1509 } 1510 } 1511 } 1512 1513 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1514 // Check if the stream element should be put to the unacknowledgedStanza 1515 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1516 // packet order is not stable at this point (sendStanzaInternal() can be 1517 // called concurrently). 1518 if (unacknowledgedStanzas != null && stanza != null) { 1519 // If the unacknowledgedStanza queue is nearly full, request an new ack 1520 // from the server in order to drain it 1521 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1522 writer.write(AckRequest.INSTANCE.toXML().toString()); 1523 writer.flush(); 1524 } 1525 try { 1526 // It is important the we put the stanza in the unacknowledged stanza 1527 // queue before we put it on the wire 1528 unacknowledgedStanzas.put(stanza); 1529 } 1530 catch (InterruptedException e) { 1531 throw new IllegalStateException(e); 1532 } 1533 } 1534 } 1535 } 1536 1537 /** 1538 * Set if Stream Management should be used by default for new connections. 1539 * 1540 * @param useSmDefault true to use Stream Management for new connections. 1541 */ 1542 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1543 XMPPTCPConnection.useSmDefault = useSmDefault; 1544 } 1545 1546 /** 1547 * Set if Stream Management resumption should be used by default for new connections. 1548 * 1549 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1550 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1551 */ 1552 @Deprecated 1553 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1554 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1555 } 1556 1557 /** 1558 * Set if Stream Management resumption should be used by default for new connections. 1559 * 1560 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1561 */ 1562 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1563 if (useSmResumptionDefault) { 1564 // Also enable SM is resumption is enabled 1565 setUseStreamManagementDefault(useSmResumptionDefault); 1566 } 1567 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1568 } 1569 1570 /** 1571 * Set if Stream Management should be used if supported by the server. 1572 * 1573 * @param useSm true to use Stream Management. 1574 */ 1575 public void setUseStreamManagement(boolean useSm) { 1576 this.useSm = useSm; 1577 } 1578 1579 /** 1580 * Set if Stream Management resumption should be used if supported by the server. 1581 * 1582 * @param useSmResumption true to use Stream Management resumption. 1583 */ 1584 public void setUseStreamManagementResumption(boolean useSmResumption) { 1585 if (useSmResumption) { 1586 // Also enable SM is resumption is enabled 1587 setUseStreamManagement(useSmResumption); 1588 } 1589 this.useSmResumption = useSmResumption; 1590 } 1591 1592 /** 1593 * Set the preferred resumption time in seconds. 1594 * @param resumptionTime the preferred resumption time in seconds 1595 */ 1596 public void setPreferredResumptionTime(int resumptionTime) { 1597 smClientMaxResumptionTime = resumptionTime; 1598 } 1599 1600 /** 1601 * Add a predicate for Stream Management acknowledgment requests. 1602 * <p> 1603 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1604 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1605 * </p> 1606 * <p> 1607 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1608 * </p> 1609 * 1610 * @param predicate the predicate to add. 1611 * @return if the predicate was not already active. 1612 */ 1613 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1614 synchronized (requestAckPredicates) { 1615 return requestAckPredicates.add(predicate); 1616 } 1617 } 1618 1619 /** 1620 * Remove the given predicate for Stream Management acknowledgment request. 1621 * @param predicate the predicate to remove. 1622 * @return true if the predicate was removed. 1623 */ 1624 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1625 synchronized (requestAckPredicates) { 1626 return requestAckPredicates.remove(predicate); 1627 } 1628 } 1629 1630 /** 1631 * Remove all predicates for Stream Management acknowledgment requests. 1632 */ 1633 public void removeAllRequestAckPredicates() { 1634 synchronized (requestAckPredicates) { 1635 requestAckPredicates.clear(); 1636 } 1637 } 1638 1639 /** 1640 * Send an unconditional Stream Management acknowledgement request to the server. 1641 * 1642 * @throws StreamManagementNotEnabledException if Stream Mangement is not enabled. 1643 * @throws NotConnectedException if the connection is not connected. 1644 * @throws InterruptedException 1645 */ 1646 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1647 if (!isSmEnabled()) { 1648 throw new StreamManagementException.StreamManagementNotEnabledException(); 1649 } 1650 requestSmAcknowledgementInternal(); 1651 } 1652 1653 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1654 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1655 } 1656 1657 /** 1658 * Send a unconditional Stream Management acknowledgment to the server. 1659 * <p> 1660 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1661 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1662 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1663 * </p> 1664 * 1665 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1666 * @throws NotConnectedException if the connection is not connected. 1667 * @throws InterruptedException 1668 */ 1669 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1670 if (!isSmEnabled()) { 1671 throw new StreamManagementException.StreamManagementNotEnabledException(); 1672 } 1673 sendSmAcknowledgementInternal(); 1674 } 1675 1676 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1677 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1678 } 1679 1680 /** 1681 * Add a Stanza acknowledged listener. 1682 * <p> 1683 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1684 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1685 * possible. 1686 * </p> 1687 * 1688 * @param listener the listener to add. 1689 */ 1690 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1691 stanzaAcknowledgedListeners.add(listener); 1692 } 1693 1694 /** 1695 * Remove the given Stanza acknowledged listener. 1696 * 1697 * @param listener the listener. 1698 * @return true if the listener was removed. 1699 */ 1700 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1701 return stanzaAcknowledgedListeners.remove(listener); 1702 } 1703 1704 /** 1705 * Remove all stanza acknowledged listeners. 1706 */ 1707 public void removeAllStanzaAcknowledgedListeners() { 1708 stanzaAcknowledgedListeners.clear(); 1709 } 1710 1711 /** 1712 * Add a new Stanza ID acknowledged listener for the given ID. 1713 * <p> 1714 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1715 * automatically be removed after the listener was run. 1716 * </p> 1717 * 1718 * @param id the stanza ID. 1719 * @param listener the listener to invoke. 1720 * @return the previous listener for this stanza ID or null. 1721 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1722 */ 1723 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1724 // Prevent users from adding callbacks that will never get removed 1725 if (!smWasEnabledAtLeastOnce) { 1726 throw new StreamManagementException.StreamManagementNotEnabledException(); 1727 } 1728 // Remove the listener after max. 12 hours 1729 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 12 * 60 * 60); 1730 schedule(new Runnable() { 1731 @Override 1732 public void run() { 1733 stanzaIdAcknowledgedListeners.remove(id); 1734 } 1735 }, removeAfterSeconds, TimeUnit.SECONDS); 1736 return stanzaIdAcknowledgedListeners.put(id, listener); 1737 } 1738 1739 /** 1740 * Remove the Stanza ID acknowledged listener for the given ID. 1741 * 1742 * @param id the stanza ID. 1743 * @return true if the listener was found and removed, false otherwise. 1744 */ 1745 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1746 return stanzaIdAcknowledgedListeners.remove(id); 1747 } 1748 1749 /** 1750 * Removes all Stanza ID acknowledged listeners. 1751 */ 1752 public void removeAllStanzaIdAcknowledgedListeners() { 1753 stanzaIdAcknowledgedListeners.clear(); 1754 } 1755 1756 /** 1757 * Returns true if Stream Management is supported by the server. 1758 * 1759 * @return true if Stream Management is supported by the server. 1760 */ 1761 public boolean isSmAvailable() { 1762 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1763 } 1764 1765 /** 1766 * Returns true if Stream Management was successfully negotiated with the server. 1767 * 1768 * @return true if Stream Management was negotiated. 1769 */ 1770 public boolean isSmEnabled() { 1771 return smEnabledSyncPoint.wasSuccessful(); 1772 } 1773 1774 /** 1775 * Returns true if the stream was successfully resumed with help of Stream Management. 1776 * 1777 * @return true if the stream was resumed. 1778 */ 1779 public boolean streamWasResumed() { 1780 return smResumedSyncPoint.wasSuccessful(); 1781 } 1782 1783 /** 1784 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1785 * 1786 * @return true if disconnected but resumption possible. 1787 */ 1788 public boolean isDisconnectedButSmResumptionPossible() { 1789 return disconnectedButResumeable && isSmResumptionPossible(); 1790 } 1791 1792 /** 1793 * Returns true if the stream is resumable. 1794 * 1795 * @return true if the stream is resumable. 1796 */ 1797 public boolean isSmResumptionPossible() { 1798 // There is no resumable stream available 1799 if (smSessionId == null) 1800 return false; 1801 1802 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1803 // Seems like we are already reconnected, report true 1804 if (shutdownTimestamp == null) { 1805 return true; 1806 } 1807 1808 // See if resumption time is over 1809 long current = System.currentTimeMillis(); 1810 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1811 if (current > shutdownTimestamp + maxResumptionMillies) { 1812 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1813 // resumption is possible 1814 return false; 1815 } else { 1816 return true; 1817 } 1818 } 1819 1820 /** 1821 * Drop the stream management state. Sets {@link #smSessionId} and 1822 * {@link #unacknowledgedStanzas} to <code>null</code>. 1823 */ 1824 private void dropSmState() { 1825 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1826 // respective. No need to reset them here. 1827 smSessionId = null; 1828 unacknowledgedStanzas = null; 1829 } 1830 1831 /** 1832 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1833 * <p> 1834 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1835 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1836 * without checking for overflows before. 1837 * </p> 1838 * 1839 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1840 */ 1841 public int getMaxSmResumptionTime() { 1842 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1843 int serverResumptionTime = smServerMaxResumptimTime > 0 ? smServerMaxResumptimTime : Integer.MAX_VALUE; 1844 return Math.min(clientResumptionTime, serverResumptionTime); 1845 } 1846 1847 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1848 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1849 final List<Stanza> ackedStanzas = new ArrayList<Stanza>( 1850 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1851 : Integer.MAX_VALUE); 1852 for (long i = 0; i < ackedStanzasCount; i++) { 1853 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1854 // If the server ack'ed a stanza, then it must be in the 1855 // unacknowledged stanza queue. There can be no exception. 1856 if (ackedStanza == null) { 1857 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1858 ackedStanzasCount, ackedStanzas); 1859 } 1860 ackedStanzas.add(ackedStanza); 1861 } 1862 1863 boolean atLeastOneStanzaAcknowledgedListener = false; 1864 if (!stanzaAcknowledgedListeners.isEmpty()) { 1865 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1866 atLeastOneStanzaAcknowledgedListener = true; 1867 } 1868 else { 1869 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1870 for (Stanza ackedStanza : ackedStanzas) { 1871 String id = ackedStanza.getStanzaId(); 1872 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1873 atLeastOneStanzaAcknowledgedListener = true; 1874 break; 1875 } 1876 } 1877 } 1878 1879 // Only spawn a new thread if there is a chance that some listener is invoked 1880 if (atLeastOneStanzaAcknowledgedListener) { 1881 asyncGo(new Runnable() { 1882 @Override 1883 public void run() { 1884 for (Stanza ackedStanza : ackedStanzas) { 1885 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1886 try { 1887 listener.processStanza(ackedStanza); 1888 } 1889 catch (InterruptedException | NotConnectedException e) { 1890 LOGGER.log(Level.FINER, "Received exception", e); 1891 } 1892 } 1893 String id = ackedStanza.getStanzaId(); 1894 if (StringUtils.isNullOrEmpty(id)) { 1895 continue; 1896 } 1897 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1898 if (listener != null) { 1899 try { 1900 listener.processStanza(ackedStanza); 1901 } 1902 catch (InterruptedException | NotConnectedException e) { 1903 LOGGER.log(Level.FINER, "Received exception", e); 1904 } 1905 } 1906 } 1907 } 1908 }); 1909 } 1910 1911 serverHandledStanzasCount = handledCount; 1912 } 1913 1914 /** 1915 * Set the default bundle and defer callback used for new connections. 1916 * 1917 * @param defaultBundleAndDeferCallback 1918 * @see BundleAndDeferCallback 1919 * @since 4.1 1920 */ 1921 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1922 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1923 } 1924 1925 /** 1926 * Set the bundle and defer callback used for this connection. 1927 * <p> 1928 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1929 * no longer get deferred. 1930 * </p> 1931 * 1932 * @param bundleAndDeferCallback the callback or <code>null</code>. 1933 * @see BundleAndDeferCallback 1934 * @since 4.1 1935 */ 1936 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1937 this.bundleAndDeferCallback = bundleAndDeferCallback; 1938 } 1939 1940}