001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.ByteArrayInputStream; 021import java.io.FileInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.OutputStream; 026import java.io.OutputStreamWriter; 027import java.io.Writer; 028import java.lang.reflect.Constructor; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.net.Socket; 032import java.security.KeyManagementException; 033import java.security.KeyStore; 034import java.security.KeyStoreException; 035import java.security.NoSuchAlgorithmException; 036import java.security.NoSuchProviderException; 037import java.security.Provider; 038import java.security.SecureRandom; 039import java.security.Security; 040import java.security.UnrecoverableKeyException; 041import java.security.cert.CertificateException; 042import java.util.ArrayList; 043import java.util.Collection; 044import java.util.Iterator; 045import java.util.LinkedHashSet; 046import java.util.LinkedList; 047import java.util.List; 048import java.util.Map; 049import java.util.Set; 050import java.util.concurrent.ArrayBlockingQueue; 051import java.util.concurrent.BlockingQueue; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentLinkedQueue; 054import java.util.concurrent.Semaphore; 055import java.util.concurrent.TimeUnit; 056import java.util.concurrent.atomic.AtomicBoolean; 057import java.util.logging.Level; 058import java.util.logging.Logger; 059 060import javax.net.SocketFactory; 061import javax.net.ssl.HostnameVerifier; 062import javax.net.ssl.KeyManager; 063import javax.net.ssl.KeyManagerFactory; 064import javax.net.ssl.SSLContext; 065import javax.net.ssl.SSLSession; 066import javax.net.ssl.SSLSocket; 067import javax.net.ssl.TrustManager; 068import javax.net.ssl.X509TrustManager; 069import javax.security.auth.callback.Callback; 070import javax.security.auth.callback.CallbackHandler; 071import javax.security.auth.callback.PasswordCallback; 072 073import org.jivesoftware.smack.AbstractConnectionListener; 074import org.jivesoftware.smack.AbstractXMPPConnection; 075import org.jivesoftware.smack.ConnectionConfiguration; 076import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode; 077import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 078import org.jivesoftware.smack.SmackConfiguration; 079import org.jivesoftware.smack.SmackException; 080import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 081import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 082import org.jivesoftware.smack.SmackException.ConnectionException; 083import org.jivesoftware.smack.SmackException.NoResponseException; 084import org.jivesoftware.smack.SmackException.NotConnectedException; 085import org.jivesoftware.smack.SmackException.NotLoggedInException; 086import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 087import org.jivesoftware.smack.SmackException.SmackWrappedException; 088import org.jivesoftware.smack.SmackFuture; 089import org.jivesoftware.smack.StanzaListener; 090import org.jivesoftware.smack.SynchronizationPoint; 091import org.jivesoftware.smack.XMPPConnection; 092import org.jivesoftware.smack.XMPPException; 093import org.jivesoftware.smack.XMPPException.FailedNonzaException; 094import org.jivesoftware.smack.XMPPException.StreamErrorException; 095import org.jivesoftware.smack.compress.packet.Compress; 096import org.jivesoftware.smack.compress.packet.Compressed; 097import org.jivesoftware.smack.compression.XMPPInputOutputStream; 098import org.jivesoftware.smack.filter.StanzaFilter; 099import org.jivesoftware.smack.packet.Element; 100import org.jivesoftware.smack.packet.IQ; 101import org.jivesoftware.smack.packet.Message; 102import org.jivesoftware.smack.packet.Nonza; 103import org.jivesoftware.smack.packet.Presence; 104import org.jivesoftware.smack.packet.Stanza; 105import org.jivesoftware.smack.packet.StartTls; 106import org.jivesoftware.smack.packet.StreamError; 107import org.jivesoftware.smack.packet.StreamOpen; 108import org.jivesoftware.smack.proxy.ProxyInfo; 109import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 110import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 111import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 112import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 113import org.jivesoftware.smack.sm.SMUtils; 114import org.jivesoftware.smack.sm.StreamManagementException; 115import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 116import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 117import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 118import org.jivesoftware.smack.sm.packet.StreamManagement; 119import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 120import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 121import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 122import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 123import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 124import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 125import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 126import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 127import org.jivesoftware.smack.sm.predicates.Predicate; 128import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 129import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 130import org.jivesoftware.smack.util.Async; 131import org.jivesoftware.smack.util.DNSUtil; 132import org.jivesoftware.smack.util.PacketParserUtils; 133import org.jivesoftware.smack.util.StringUtils; 134import org.jivesoftware.smack.util.TLSUtils; 135import org.jivesoftware.smack.util.XmlStringBuilder; 136import org.jivesoftware.smack.util.dns.HostAddress; 137import org.jivesoftware.smack.util.dns.SmackDaneProvider; 138import org.jivesoftware.smack.util.dns.SmackDaneVerifier; 139 140import org.jxmpp.jid.impl.JidCreate; 141import org.jxmpp.jid.parts.Resourcepart; 142import org.jxmpp.stringprep.XmppStringprepException; 143import org.jxmpp.util.XmppStringUtils; 144import org.xmlpull.v1.XmlPullParser; 145import org.xmlpull.v1.XmlPullParserException; 146 147/** 148 * Creates a socket connection to an XMPP server. This is the default connection 149 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 150 * 151 * @see XMPPConnection 152 * @author Matt Tucker 153 */ 154public class XMPPTCPConnection extends AbstractXMPPConnection { 155 156 private static final int QUEUE_SIZE = 500; 157 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 158 159 /** 160 * The socket which is used for this connection. 161 */ 162 private Socket socket; 163 164 /** 165 * 166 */ 167 private boolean disconnectedButResumeable = false; 168 169 private SSLSocket secureSocket; 170 171 private final Semaphore readerWriterSemaphore = new Semaphore(2); 172 173 /** 174 * Protected access level because of unit test purposes 175 */ 176 protected final PacketWriter packetWriter = new PacketWriter(); 177 178 /** 179 * Protected access level because of unit test purposes 180 */ 181 protected final PacketReader packetReader = new PacketReader(); 182 183 private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>( 184 this, "initial open stream element send to server"); 185 186 /** 187 * 188 */ 189 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 190 this, "stream compression feature"); 191 192 /** 193 * 194 */ 195 private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>( 196 this, "stream compression"); 197 198 /** 199 * A synchronization point which is successful if this connection has received the closing 200 * stream element from the remote end-point, i.e. the server. 201 */ 202 private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>( 203 this, "stream closing element received"); 204 205 /** 206 * The default bundle and defer callback, used for new connections. 207 * @see bundleAndDeferCallback 208 */ 209 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 210 211 /** 212 * The used bundle and defer callback. 213 * <p> 214 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 215 * having a 'volatile' read within the writer threads loop. 216 * </p> 217 */ 218 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 219 220 private static boolean useSmDefault = true; 221 222 private static boolean useSmResumptionDefault = true; 223 224 /** 225 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 226 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 227 * {@link #unacknowledgedStanzas}. 228 */ 229 private String smSessionId; 230 231 private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>( 232 this, "stream resumed element"); 233 234 private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>( 235 this, "stream enabled element"); 236 237 /** 238 * The client's preferred maximum resumption time in seconds. 239 */ 240 private int smClientMaxResumptionTime = -1; 241 242 /** 243 * The server's preferred maximum resumption time in seconds. 244 */ 245 private int smServerMaxResumptionTime = -1; 246 247 /** 248 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 249 */ 250 private boolean useSm = useSmDefault; 251 private boolean useSmResumption = useSmResumptionDefault; 252 253 /** 254 * The counter that the server sends the client about it's current height. For example, if the server sends 255 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 256 */ 257 private long serverHandledStanzasCount = 0; 258 259 /** 260 * The counter for stanzas handled ("received") by the client. 261 * <p> 262 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 263 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 264 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 265 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 266 * </p> 267 */ 268 private long clientHandledStanzasCount = 0; 269 270 private BlockingQueue<Stanza> unacknowledgedStanzas; 271 272 /** 273 * Set to true if Stream Management was at least once enabled for this connection. 274 */ 275 private boolean smWasEnabledAtLeastOnce = false; 276 277 /** 278 * This listeners are invoked for every stanza that got acknowledged. 279 * <p> 280 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 281 * themselves after they have been invoked. 282 * </p> 283 */ 284 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 285 286 /** 287 * These listeners are invoked for every stanza that got dropped. 288 * <p> 289 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 290 * themselves after they have been invoked. 291 * </p> 292 */ 293 private final Collection<StanzaListener> stanzaDroppedListeners = new ConcurrentLinkedQueue<>(); 294 295 /** 296 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 297 * only be invoked once and automatically removed after that. 298 */ 299 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 300 301 /** 302 * Predicates that determine if an stream management ack should be requested from the server. 303 * <p> 304 * We use a linked hash set here, so that the order how the predicates are added matches the 305 * order in which they are invoked in order to determine if an ack request should be send or not. 306 * </p> 307 */ 308 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 309 310 @SuppressWarnings("HidingField") 311 private final XMPPTCPConnectionConfiguration config; 312 313 /** 314 * Creates a new XMPP connection over TCP (optionally using proxies). 315 * <p> 316 * Note that XMPPTCPConnection constructors do not establish a connection to the server 317 * and you must call {@link #connect()}. 318 * </p> 319 * 320 * @param config the connection configuration. 321 */ 322 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 323 super(config); 324 this.config = config; 325 addConnectionListener(new AbstractConnectionListener() { 326 @Override 327 public void connectionClosedOnError(Exception e) { 328 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 329 dropSmState(); 330 } 331 } 332 }); 333 } 334 335 /** 336 * Creates a new XMPP connection over TCP. 337 * <p> 338 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 339 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 340 * constructor. 341 * </p> 342 * 343 * @param jid the bare JID used by the client. 344 * @param password the password or authentication token. 345 * @throws XmppStringprepException 346 */ 347 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 348 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 349 } 350 351 /** 352 * Creates a new XMPP connection over TCP. 353 * <p> 354 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 355 * you can get fine-grained control over connection settings using the 356 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 357 * </p> 358 * @param username 359 * @param password 360 * @param serviceName 361 * @throws XmppStringprepException 362 */ 363 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 364 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 365 JidCreate.domainBareFrom(serviceName)).build()); 366 } 367 368 @Override 369 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 370 if (packetWriter == null) { 371 throw new NotConnectedException(); 372 } 373 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 374 } 375 376 @Override 377 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 378 if (isConnected() && !disconnectedButResumeable) { 379 throw new AlreadyConnectedException(); 380 } 381 } 382 383 @Override 384 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 385 if (isAuthenticated() && !disconnectedButResumeable) { 386 throw new AlreadyLoggedInException(); 387 } 388 } 389 390 @Override 391 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 392 // Reset the flag in case it was set 393 disconnectedButResumeable = false; 394 super.afterSuccessfulLogin(resumed); 395 } 396 397 @Override 398 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 399 SmackException, IOException, InterruptedException { 400 // Authenticate using SASL 401 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 402 saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession); 403 404 // Wait for stream features after the authentication. 405 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 406 // renamed to "streamFeaturesAfterAuthenticationReceived". 407 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 408 409 // If compression is enabled then request the server to use stream compression. XEP-170 410 // recommends to perform stream compression before resource binding. 411 maybeEnableCompression(); 412 413 if (isSmResumptionPossible()) { 414 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 415 if (smResumedSyncPoint.wasSuccessful()) { 416 // We successfully resumed the stream, be done here 417 afterSuccessfulLogin(true); 418 return; 419 } 420 // SM resumption failed, what Smack does here is to report success of 421 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 422 // normal resource binding can be tried. 423 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 424 } 425 426 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 427 if (unacknowledgedStanzas != null) { 428 // There was a previous connection with SM enabled but that was either not resumable or 429 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 430 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 431 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 432 // XMPP session (There maybe was an enabled in a previous XMPP session of this 433 // connection instance though). This is used in writePackets to decide if stanzas should 434 // be added to the unacknowledged stanzas queue, because they have to be added right 435 // after the 'enable' stream element has been sent. 436 dropSmState(); 437 } 438 439 // Now bind the resource. It is important to do this *after* we dropped an eventually 440 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 441 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 442 bindResourceAndEstablishSession(resource); 443 444 if (isSmAvailable() && useSm) { 445 // Remove what is maybe left from previously stream managed sessions 446 serverHandledStanzasCount = 0; 447 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 448 // then this is a non recoverable error and we therefore throw an exception. 449 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 450 synchronized (requestAckPredicates) { 451 if (requestAckPredicates.isEmpty()) { 452 // Assure that we have at lest one predicate set up that so that we request acks 453 // for the server and eventually flush some stanzas from the unacknowledged 454 // stanza queue 455 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 456 } 457 } 458 } 459 // Inform client about failed resumption if possible, resend stanzas otherwise 460 // Process the stanzas synchronously so a client can re-queue them for transmission 461 // before it is informed about connection success 462 if (!stanzaDroppedListeners.isEmpty()) { 463 for (Stanza stanza : previouslyUnackedStanzas) { 464 for (StanzaListener listener : stanzaDroppedListeners) { 465 try { 466 listener.processStanza(stanza); 467 } 468 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 469 LOGGER.log(Level.FINER, "StanzaDroppedListener received exception", e); 470 } 471 } 472 } 473 } else { 474 for (Stanza stanza : previouslyUnackedStanzas) { 475 sendStanzaInternal(stanza); 476 } 477 } 478 479 afterSuccessfulLogin(false); 480 } 481 482 @Override 483 public boolean isSecureConnection() { 484 return secureSocket != null; 485 } 486 487 /** 488 * Shuts the current connection down. After this method returns, the connection must be ready 489 * for re-use by connect. 490 */ 491 @Override 492 protected void shutdown() { 493 if (isSmEnabled()) { 494 try { 495 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 496 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 497 sendSmAcknowledgementInternal(); 498 } catch (InterruptedException | NotConnectedException e) { 499 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 500 } 501 } 502 shutdown(false); 503 } 504 505 @Override 506 public synchronized void instantShutdown() { 507 shutdown(true); 508 } 509 510 private void shutdown(boolean instant) { 511 if (disconnectedButResumeable) { 512 return; 513 } 514 515 // First shutdown the writer, this will result in a closing stream element getting send to 516 // the server 517 LOGGER.finer("PacketWriter shutdown()"); 518 packetWriter.shutdown(instant); 519 LOGGER.finer("PacketWriter has been shut down"); 520 521 if (!instant) { 522 try { 523 // After we send the closing stream element, check if there was already a 524 // closing stream element sent by the server or wait with a timeout for a 525 // closing stream element to be received from the server. 526 @SuppressWarnings("unused") 527 Exception res = closingStreamReceived.checkIfSuccessOrWait(); 528 } catch (InterruptedException | NoResponseException e) { 529 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 530 } 531 } 532 533 LOGGER.finer("PacketReader shutdown()"); 534 packetReader.shutdown(); 535 LOGGER.finer("PacketReader has been shut down"); 536 537 try { 538 socket.close(); 539 } catch (Exception e) { 540 LOGGER.log(Level.WARNING, "shutdown", e); 541 } 542 543 setWasAuthenticated(); 544 // If we are able to resume the stream, then don't set 545 // connected/authenticated/usingTLS to false since we like behave like we are still 546 // connected (e.g. sendStanza should not throw a NotConnectedException). 547 if (isSmResumptionPossible() && instant) { 548 disconnectedButResumeable = true; 549 } else { 550 disconnectedButResumeable = false; 551 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 552 // stream tag, there is no longer a stream to resume. 553 smSessionId = null; 554 // Note that we deliberately do not reset authenticatedConnectionInitiallyEstablishedTimestamp here, so that the 555 // information is available in the connectionClosedOnError() listeners. 556 } 557 authenticated = false; 558 connected = false; 559 secureSocket = null; 560 reader = null; 561 writer = null; 562 563 initState(); 564 } 565 566 @Override 567 protected void initState() { 568 super.initState(); 569 maybeCompressFeaturesReceived.init(); 570 compressSyncPoint.init(); 571 smResumedSyncPoint.init(); 572 smEnabledSyncPoint.init(); 573 initialOpenStreamSend.init(); 574 } 575 576 @Override 577 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 578 packetWriter.sendStreamElement(element); 579 } 580 581 @Override 582 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 583 packetWriter.sendStreamElement(packet); 584 if (isSmEnabled()) { 585 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 586 if (requestAckPredicate.accept(packet)) { 587 requestSmAcknowledgementInternal(); 588 break; 589 } 590 } 591 } 592 } 593 594 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 595 List<HostAddress> failedAddresses = populateHostAddresses(); 596 SocketFactory socketFactory = config.getSocketFactory(); 597 ProxyInfo proxyInfo = config.getProxyInfo(); 598 int timeout = config.getConnectTimeout(); 599 if (socketFactory == null) { 600 socketFactory = SocketFactory.getDefault(); 601 } 602 for (HostAddress hostAddress : hostAddresses) { 603 Iterator<InetAddress> inetAddresses; 604 String host = hostAddress.getHost(); 605 int port = hostAddress.getPort(); 606 if (proxyInfo == null) { 607 inetAddresses = hostAddress.getInetAddresses().iterator(); 608 assert (inetAddresses.hasNext()); 609 610 innerloop: while (inetAddresses.hasNext()) { 611 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 612 // re-usable after a failed connection attempt. See also SMACK-724. 613 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 614 615 final InetAddress inetAddress = inetAddresses.next(); 616 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 617 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 618 socketFuture.connectAsync(inetSocketAddress, timeout); 619 620 try { 621 socket = socketFuture.getOrThrow(); 622 } catch (IOException e) { 623 hostAddress.setException(inetAddress, e); 624 if (inetAddresses.hasNext()) { 625 continue innerloop; 626 } else { 627 break innerloop; 628 } 629 } 630 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 631 // We found a host to connect to, return here 632 this.host = host; 633 this.port = port; 634 return; 635 } 636 failedAddresses.add(hostAddress); 637 } else { 638 socket = socketFactory.createSocket(); 639 StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy"); 640 final String hostAndPort = host + " at port " + port; 641 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 642 try { 643 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 644 } catch (IOException e) { 645 hostAddress.setException(e); 646 continue; 647 } 648 LOGGER.finer("Established TCP connection to " + hostAndPort); 649 // We found a host to connect to, return here 650 this.host = host; 651 this.port = port; 652 return; 653 } 654 } 655 // There are no more host addresses to try 656 // throw an exception and report all tried 657 // HostAddresses in the exception 658 throw ConnectionException.from(failedAddresses); 659 } 660 661 /** 662 * Initializes the connection by creating a stanza reader and writer and opening a 663 * XMPP stream to the server. 664 * 665 * @throws XMPPException if establishing a connection to the server fails. 666 * @throws SmackException if the server fails to respond back or if there is anther error. 667 * @throws IOException 668 * @throws InterruptedException 669 */ 670 private void initConnection() throws IOException, InterruptedException { 671 compressionHandler = null; 672 673 // Set the reader and writer instance variables 674 initReaderAndWriter(); 675 676 int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); 677 if (availableReaderWriterSemaphorePermits < 2) { 678 Object[] logObjects = new Object[] { 679 this, 680 availableReaderWriterSemaphorePermits, 681 }; 682 LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects); 683 } 684 readerWriterSemaphore.acquire(2); 685 // Start the writer thread. This will open an XMPP stream to the server 686 packetWriter.init(); 687 // Start the reader thread. The startup() method will block until we 688 // get an opening stream packet back from server 689 packetReader.init(); 690 } 691 692 private void initReaderAndWriter() throws IOException { 693 InputStream is = socket.getInputStream(); 694 OutputStream os = socket.getOutputStream(); 695 if (compressionHandler != null) { 696 is = compressionHandler.getInputStream(is); 697 os = compressionHandler.getOutputStream(os); 698 } 699 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 700 writer = new OutputStreamWriter(os, "UTF-8"); 701 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 702 703 // If debugging is enabled, we open a window and write out all network traffic. 704 initDebugger(); 705 } 706 707 /** 708 * The server has indicated that TLS negotiation can start. We now need to secure the 709 * existing plain connection and perform a handshake. This method won't return until the 710 * connection has finished the handshake or an error occurred while securing the connection. 711 * @throws IOException 712 * @throws CertificateException 713 * @throws NoSuchAlgorithmException 714 * @throws NoSuchProviderException 715 * @throws KeyStoreException 716 * @throws UnrecoverableKeyException 717 * @throws KeyManagementException 718 * @throws SmackException 719 * @throws Exception if an exception occurs. 720 */ 721 @SuppressWarnings("LiteralClassName") 722 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 723 SmackDaneVerifier daneVerifier = null; 724 725 if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) { 726 SmackDaneProvider daneProvider = DNSUtil.getDaneProvider(); 727 if (daneProvider == null) { 728 throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured"); 729 } 730 daneVerifier = daneProvider.newInstance(); 731 if (daneVerifier == null) { 732 throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier"); 733 } 734 } 735 736 SSLContext context = this.config.getCustomSSLContext(); 737 KeyStore ks = null; 738 PasswordCallback pcb = null; 739 740 if (context == null) { 741 final String keyStoreType = config.getKeystoreType(); 742 final CallbackHandler callbackHandler = config.getCallbackHandler(); 743 final String keystorePath = config.getKeystorePath(); 744 if ("PKCS11".equals(keyStoreType)) { 745 try { 746 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 747 String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library(); 748 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8)); 749 Provider p = (Provider) c.newInstance(config); 750 Security.addProvider(p); 751 ks = KeyStore.getInstance("PKCS11",p); 752 pcb = new PasswordCallback("PKCS11 Password: ",false); 753 callbackHandler.handle(new Callback[] {pcb}); 754 ks.load(null,pcb.getPassword()); 755 } 756 catch (Exception e) { 757 LOGGER.log(Level.WARNING, "Exception", e); 758 ks = null; 759 } 760 } 761 else if ("Apple".equals(keyStoreType)) { 762 ks = KeyStore.getInstance("KeychainStore","Apple"); 763 ks.load(null,null); 764 // pcb = new PasswordCallback("Apple Keychain",false); 765 // pcb.setPassword(null); 766 } 767 else if (keyStoreType != null) { 768 ks = KeyStore.getInstance(keyStoreType); 769 if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) { 770 try { 771 pcb = new PasswordCallback("Keystore Password: ", false); 772 callbackHandler.handle(new Callback[] { pcb }); 773 ks.load(new FileInputStream(keystorePath), pcb.getPassword()); 774 } 775 catch (Exception e) { 776 LOGGER.log(Level.WARNING, "Exception", e); 777 ks = null; 778 } 779 } else { 780 ks.load(null, null); 781 } 782 } 783 784 KeyManager[] kms = null; 785 786 if (ks != null) { 787 String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); 788 KeyManagerFactory kmf = null; 789 try { 790 kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); 791 } 792 catch (NoSuchAlgorithmException e) { 793 LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '" 794 + keyManagerFactoryAlgorithm + "' algorithm", e); 795 } 796 if (kmf != null) { 797 try { 798 if (pcb == null) { 799 kmf.init(ks, null); 800 } 801 else { 802 kmf.init(ks, pcb.getPassword()); 803 pcb.clearPassword(); 804 } 805 kms = kmf.getKeyManagers(); 806 } 807 catch (NullPointerException npe) { 808 LOGGER.log(Level.WARNING, "NullPointerException", npe); 809 } 810 } 811 } 812 813 // If the user didn't specify a SSLContext, use the default one 814 context = SSLContext.getInstance("TLS"); 815 816 final SecureRandom secureRandom = new java.security.SecureRandom(); 817 X509TrustManager customTrustManager = config.getCustomX509TrustManager(); 818 819 if (daneVerifier != null) { 820 // User requested DANE verification. 821 daneVerifier.init(context, kms, customTrustManager, secureRandom); 822 } else { 823 TrustManager[] customTrustManagers = null; 824 if (customTrustManager != null) { 825 customTrustManagers = new TrustManager[] { customTrustManager }; 826 } 827 context.init(kms, customTrustManagers, secureRandom); 828 } 829 } 830 831 Socket plain = socket; 832 // Secure the plain connection 833 socket = context.getSocketFactory().createSocket(plain, 834 config.getXMPPServiceDomain().toString(), plain.getPort(), true); 835 836 final SSLSocket sslSocket = (SSLSocket) socket; 837 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 838 // important (at least on certain platforms) and it seems to be a good idea anyways to 839 // prevent an accidental implicit handshake. 840 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 841 842 // Initialize the reader and writer with the new secured version 843 initReaderAndWriter(); 844 845 // Proceed to do the handshake 846 sslSocket.startHandshake(); 847 848 if (daneVerifier != null) { 849 daneVerifier.finish(sslSocket); 850 } 851 852 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 853 if (verifier == null) { 854 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 855 } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) { 856 throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain()); 857 } 858 859 // Set that TLS was successful 860 secureSocket = sslSocket; 861 } 862 863 /** 864 * Returns the compression handler that can be used for one compression methods offered by the server. 865 * 866 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 867 * 868 */ 869 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 870 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 871 String method = handler.getCompressionMethod(); 872 if (compression.getMethods().contains(method)) 873 return handler; 874 } 875 return null; 876 } 877 878 @Override 879 public boolean isUsingCompression() { 880 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 881 } 882 883 /** 884 * <p> 885 * Starts using stream compression that will compress network traffic. Traffic can be 886 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 887 * connection. However, the server and the client will need to use more CPU time in order to 888 * un/compress network data so under high load the server performance might be affected. 889 * </p> 890 * <p> 891 * Stream compression has to have been previously offered by the server. Currently only the 892 * zlib method is supported by the client. Stream compression negotiation has to be done 893 * before authentication took place. 894 * </p> 895 * 896 * @throws NotConnectedException 897 * @throws SmackException 898 * @throws NoResponseException 899 * @throws InterruptedException 900 */ 901 private void maybeEnableCompression() throws SmackException, InterruptedException { 902 if (!config.isCompressionEnabled()) { 903 return; 904 } 905 906 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 907 if (compression == null) { 908 // Server does not support compression 909 return; 910 } 911 // If stream compression was offered by the server and we want to use 912 // compression then send compression request to the server 913 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 914 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 915 } else { 916 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 917 } 918 } 919 920 /** 921 * Establishes a connection to the XMPP server. It basically 922 * creates and maintains a socket connection to the server. 923 * <p> 924 * Listeners will be preserved from a previous connection if the reconnection 925 * occurs after an abrupt termination. 926 * </p> 927 * 928 * @throws XMPPException if an error occurs while trying to establish the connection. 929 * @throws SmackException 930 * @throws IOException 931 * @throws InterruptedException 932 */ 933 @Override 934 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 935 closingStreamReceived.init(); 936 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 937 // there is an error establishing the connection 938 connectUsingConfiguration(); 939 940 // We connected successfully to the servers TCP port 941 initConnection(); 942 943 // TLS handled will be successful either if TLS was established, or if it was not mandatory. 944 tlsHandled.checkIfSuccessOrWaitOrThrow(); 945 946 // Wait with SASL auth until the SASL mechanisms have been received 947 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 948 } 949 950 /** 951 * Sends out a notification that there was an error with the connection 952 * and closes the connection. Also prints the stack trace of the given exception 953 * 954 * @param e the exception that causes the connection close event. 955 */ 956 private synchronized void notifyConnectionError(final Exception e) { 957 ASYNC_BUT_ORDERED.performAsyncButOrdered(this, new Runnable() { 958 @Override 959 public void run() { 960 // Listeners were already notified of the exception, return right here. 961 if (packetReader.done || packetWriter.done()) return; 962 963 // Report the failure outside the synchronized block, so that a thread waiting within a synchronized 964 // function like connect() throws the wrapped exception. 965 SmackWrappedException smackWrappedException = new SmackWrappedException(e); 966 tlsHandled.reportGenericFailure(smackWrappedException); 967 saslFeatureReceived.reportGenericFailure(smackWrappedException); 968 maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException); 969 lastFeaturesReceived.reportGenericFailure(smackWrappedException); 970 971 synchronized (XMPPTCPConnection.this) { 972 // Within this synchronized block, either *both* reader and writer threads must be terminated, or 973 // none. 974 assert ((packetReader.done && packetWriter.done()) 975 || (!packetReader.done && !packetWriter.done())); 976 977 // Closes the connection temporary. A reconnection is possible 978 // Note that a connection listener of XMPPTCPConnection will drop the SM state in 979 // case the Exception is a StreamErrorException. 980 instantShutdown(); 981 982 // Wait for reader and writer threads to be terminated. 983 readerWriterSemaphore.acquireUninterruptibly(2); 984 readerWriterSemaphore.release(2); 985 } 986 987 Async.go(new Runnable() { 988 @Override 989 public void run() { 990 // Notify connection listeners of the error. 991 callConnectionClosedOnErrorListener(e); 992 } 993 }, XMPPTCPConnection.this + " callConnectionClosedOnErrorListener()"); 994 } 995 }); 996 } 997 998 /** 999 * For unit testing purposes 1000 * 1001 * @param writer 1002 */ 1003 protected void setWriter(Writer writer) { 1004 this.writer = writer; 1005 } 1006 1007 @Override 1008 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException, SecurityRequiredByServerException { 1009 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 1010 if (startTlsFeature != null) { 1011 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 1012 SecurityRequiredByServerException smackException = new SecurityRequiredByServerException(); 1013 tlsHandled.reportFailure(smackException); 1014 throw smackException; 1015 } 1016 1017 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 1018 sendNonza(new StartTls()); 1019 } else { 1020 tlsHandled.reportSuccess(); 1021 } 1022 } else { 1023 tlsHandled.reportSuccess(); 1024 } 1025 1026 if (getSASLAuthentication().authenticationSuccessful()) { 1027 // If we have received features after the SASL has been successfully completed, then we 1028 // have also *maybe* received, as it is an optional feature, the compression feature 1029 // from the server. 1030 maybeCompressFeaturesReceived.reportSuccess(); 1031 } 1032 } 1033 1034 /** 1035 * Resets the parser using the latest connection's reader. Resetting the parser is necessary 1036 * when the plain connection has been secured or when a new opening stream element is going 1037 * to be sent by the server. 1038 * 1039 * @throws SmackException if the parser could not be reset. 1040 * @throws InterruptedException 1041 */ 1042 void openStream() throws SmackException, InterruptedException { 1043 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 1044 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 1045 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 1046 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 1047 CharSequence to = getXMPPServiceDomain(); 1048 CharSequence from = null; 1049 CharSequence localpart = config.getUsername(); 1050 if (localpart != null) { 1051 from = XmppStringUtils.completeJidFrom(localpart, to); 1052 } 1053 String id = getStreamId(); 1054 sendNonza(new StreamOpen(to, from, id)); 1055 try { 1056 packetReader.parser = PacketParserUtils.newXmppParser(reader); 1057 } 1058 catch (XmlPullParserException e) { 1059 throw new SmackException(e); 1060 } 1061 } 1062 1063 protected class PacketReader { 1064 1065 XmlPullParser parser; 1066 1067 private volatile boolean done; 1068 1069 /** 1070 * Initializes the reader in order to be used. The reader is initialized during the 1071 * first connection and when reconnecting due to an abruptly disconnection. 1072 */ 1073 void init() { 1074 done = false; 1075 1076 Async.go(new Runnable() { 1077 @Override 1078 public void run() { 1079 try { 1080 parsePackets(); 1081 } finally { 1082 XMPPTCPConnection.this.readerWriterSemaphore.release(); 1083 } 1084 } 1085 }, "Smack Reader (" + getConnectionCounter() + ")"); 1086 } 1087 1088 /** 1089 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 1090 */ 1091 void shutdown() { 1092 done = true; 1093 } 1094 1095 /** 1096 * Parse top-level packets in order to process them further. 1097 */ 1098 private void parsePackets() { 1099 try { 1100 initialOpenStreamSend.checkIfSuccessOrWait(); 1101 int eventType = parser.getEventType(); 1102 while (!done) { 1103 switch (eventType) { 1104 case XmlPullParser.START_TAG: 1105 final String name = parser.getName(); 1106 switch (name) { 1107 case Message.ELEMENT: 1108 case IQ.IQ_ELEMENT: 1109 case Presence.ELEMENT: 1110 try { 1111 parseAndProcessStanza(parser); 1112 } finally { 1113 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 1114 } 1115 break; 1116 case "stream": 1117 // We found an opening stream. 1118 if ("jabber:client".equals(parser.getNamespace(null))) { 1119 streamId = parser.getAttributeValue("", "id"); 1120 String reportedServerDomain = parser.getAttributeValue("", "from"); 1121 assert (config.getXMPPServiceDomain().equals(reportedServerDomain)); 1122 } 1123 break; 1124 case "error": 1125 StreamError streamError = PacketParserUtils.parseStreamError(parser); 1126 saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); 1127 // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync 1128 // point to report the error, which is checked immediately after tlsHandled in 1129 // connectInternal(). 1130 tlsHandled.reportSuccess(); 1131 throw new StreamErrorException(streamError); 1132 case "features": 1133 parseFeatures(parser); 1134 break; 1135 case "proceed": 1136 try { 1137 // Secure the connection by negotiating TLS 1138 proceedTLSReceived(); 1139 // Send a new opening stream to the server 1140 openStream(); 1141 } 1142 catch (Exception e) { 1143 SmackException smackException = new SmackException(e); 1144 tlsHandled.reportFailure(smackException); 1145 throw e; 1146 } 1147 break; 1148 case "failure": 1149 String namespace = parser.getNamespace(null); 1150 switch (namespace) { 1151 case "urn:ietf:params:xml:ns:xmpp-tls": 1152 // TLS negotiation has failed. The server will close the connection 1153 // TODO Parse failure stanza 1154 throw new SmackException("TLS negotiation has failed"); 1155 case "http://jabber.org/protocol/compress": 1156 // Stream compression has been denied. This is a recoverable 1157 // situation. It is still possible to authenticate and 1158 // use the connection but using an uncompressed connection 1159 // TODO Parse failure stanza 1160 compressSyncPoint.reportFailure(new SmackException( 1161 "Could not establish compression")); 1162 break; 1163 case SaslStreamElements.NAMESPACE: 1164 // SASL authentication has failed. The server may close the connection 1165 // depending on the number of retries 1166 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1167 getSASLAuthentication().authenticationFailed(failure); 1168 break; 1169 } 1170 break; 1171 case Challenge.ELEMENT: 1172 // The server is challenging the SASL authentication made by the client 1173 String challengeData = parser.nextText(); 1174 getSASLAuthentication().challengeReceived(challengeData); 1175 break; 1176 case Success.ELEMENT: 1177 Success success = new Success(parser.nextText()); 1178 // We now need to bind a resource for the connection 1179 // Open a new stream and wait for the response 1180 openStream(); 1181 // The SASL authentication with the server was successful. The next step 1182 // will be to bind the resource 1183 getSASLAuthentication().authenticated(success); 1184 break; 1185 case Compressed.ELEMENT: 1186 // Server confirmed that it's possible to use stream compression. Start 1187 // stream compression 1188 // Initialize the reader and writer with the new compressed version 1189 initReaderAndWriter(); 1190 // Send a new opening stream to the server 1191 openStream(); 1192 // Notify that compression is being used 1193 compressSyncPoint.reportSuccess(); 1194 break; 1195 case Enabled.ELEMENT: 1196 Enabled enabled = ParseStreamManagement.enabled(parser); 1197 if (enabled.isResumeSet()) { 1198 smSessionId = enabled.getId(); 1199 if (StringUtils.isNullOrEmpty(smSessionId)) { 1200 SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received"); 1201 smEnabledSyncPoint.reportFailure(xmppException); 1202 throw xmppException; 1203 } 1204 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1205 } else { 1206 // Mark this a non-resumable stream by setting smSessionId to null 1207 smSessionId = null; 1208 } 1209 clientHandledStanzasCount = 0; 1210 smWasEnabledAtLeastOnce = true; 1211 smEnabledSyncPoint.reportSuccess(); 1212 LOGGER.fine("Stream Management (XEP-198): successfully enabled"); 1213 break; 1214 case Failed.ELEMENT: 1215 Failed failed = ParseStreamManagement.failed(parser); 1216 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1217 // If only XEP-198 would specify different failure elements for the SM 1218 // enable and SM resume failure case. But this is not the case, so we 1219 // need to determine if this is a 'Failed' response for either 'Enable' 1220 // or 'Resume'. 1221 if (smResumedSyncPoint.requestSent()) { 1222 smResumedSyncPoint.reportFailure(xmppException); 1223 } 1224 else { 1225 if (!smEnabledSyncPoint.requestSent()) { 1226 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1227 } 1228 smEnabledSyncPoint.reportFailure(new SmackException(xmppException)); 1229 // Report success for last lastFeaturesReceived so that in case a 1230 // failed resumption, we can continue with normal resource binding. 1231 // See text of XEP-198 5. below Example 11. 1232 lastFeaturesReceived.reportSuccess(); 1233 } 1234 break; 1235 case Resumed.ELEMENT: 1236 Resumed resumed = ParseStreamManagement.resumed(parser); 1237 if (!smSessionId.equals(resumed.getPrevId())) { 1238 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1239 } 1240 // Mark SM as enabled 1241 smEnabledSyncPoint.reportSuccess(); 1242 // First, drop the stanzas already handled by the server 1243 processHandledCount(resumed.getHandledCount()); 1244 // Then re-send what is left in the unacknowledged queue 1245 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1246 unacknowledgedStanzas.drainTo(stanzasToResend); 1247 for (Stanza stanza : stanzasToResend) { 1248 sendStanzaInternal(stanza); 1249 } 1250 // If there where stanzas resent, then request a SM ack for them. 1251 // Writer's sendStreamElement() won't do it automatically based on 1252 // predicates. 1253 if (!stanzasToResend.isEmpty()) { 1254 requestSmAcknowledgementInternal(); 1255 } 1256 // Mark SM resumption as successful 1257 smResumedSyncPoint.reportSuccess(); 1258 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1259 break; 1260 case AckAnswer.ELEMENT: 1261 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1262 processHandledCount(ackAnswer.getHandledCount()); 1263 break; 1264 case AckRequest.ELEMENT: 1265 ParseStreamManagement.ackRequest(parser); 1266 if (smEnabledSyncPoint.wasSuccessful()) { 1267 sendSmAcknowledgementInternal(); 1268 } else { 1269 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1270 } 1271 break; 1272 default: 1273 LOGGER.warning("Unknown top level stream element: " + name); 1274 break; 1275 } 1276 break; 1277 case XmlPullParser.END_TAG: 1278 final String endTagName = parser.getName(); 1279 if ("stream".equals(endTagName)) { 1280 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1281 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1282 break; 1283 } 1284 1285 // Check if the queue was already shut down before reporting success on closing stream tag 1286 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1287 // did re-start the queue again, causing this writer to assume that the queue is not 1288 // shutdown, which results in a call to disconnect(). 1289 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1290 closingStreamReceived.reportSuccess(); 1291 1292 if (queueWasShutdown) { 1293 // We received a closing stream element *after* we initiated the 1294 // termination of the session by sending a closing stream element to 1295 // the server first 1296 return; 1297 } else { 1298 // We received a closing stream element from the server without us 1299 // sending a closing stream element first. This means that the 1300 // server wants to terminate the session, therefore disconnect 1301 // the connection 1302 LOGGER.info(XMPPTCPConnection.this 1303 + " received closing </stream> element." 1304 + " Server wants to terminate the connection, calling disconnect()"); 1305 ASYNC_BUT_ORDERED.performAsyncButOrdered(XMPPTCPConnection.this, new Runnable() { 1306 @Override 1307 public void run() { 1308 disconnect(); 1309 }}); 1310 } 1311 } 1312 break; 1313 case XmlPullParser.END_DOCUMENT: 1314 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1315 // closing stream element before. 1316 throw new SmackException( 1317 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1318 } 1319 eventType = parser.next(); 1320 } 1321 } 1322 catch (Exception e) { 1323 closingStreamReceived.reportFailure(e); 1324 // The exception can be ignored if the the connection is 'done' 1325 // or if the it was caused because the socket got closed 1326 if (!(done || packetWriter.queue.isShutdown())) { 1327 // Close the connection and notify connection listeners of the 1328 // error. 1329 notifyConnectionError(e); 1330 } 1331 } 1332 } 1333 } 1334 1335 protected class PacketWriter { 1336 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1337 1338 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1339 QUEUE_SIZE, true); 1340 1341 /** 1342 * Needs to be protected for unit testing purposes. 1343 */ 1344 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>( 1345 XMPPTCPConnection.this, "shutdown completed"); 1346 1347 /** 1348 * If set, the stanza writer is shut down 1349 */ 1350 protected volatile Long shutdownTimestamp = null; 1351 1352 private volatile boolean instantShutdown; 1353 1354 /** 1355 * True if some preconditions are given to start the bundle and defer mechanism. 1356 * <p> 1357 * This will likely get set to true right after the start of the writer thread, because 1358 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1359 * this field to true. 1360 * </p> 1361 */ 1362 private boolean shouldBundleAndDefer; 1363 1364 /** 1365 * Initializes the writer in order to be used. It is called at the first connection and also 1366 * is invoked if the connection is disconnected by an error. 1367 */ 1368 void init() { 1369 shutdownDone.init(); 1370 shutdownTimestamp = null; 1371 1372 if (unacknowledgedStanzas != null) { 1373 // It's possible that there are new stanzas in the writer queue that 1374 // came in while we were disconnected but resumable, drain those into 1375 // the unacknowledged queue so that they get resent now 1376 drainWriterQueueToUnacknowledgedStanzas(); 1377 } 1378 1379 queue.start(); 1380 Async.go(new Runnable() { 1381 @Override 1382 public void run() { 1383 try { 1384 writePackets(); 1385 } finally { 1386 XMPPTCPConnection.this.readerWriterSemaphore.release(); 1387 } 1388 } 1389 }, "Smack Writer (" + getConnectionCounter() + ")"); 1390 } 1391 1392 private boolean done() { 1393 return shutdownTimestamp != null; 1394 } 1395 1396 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1397 final boolean done = done(); 1398 if (done) { 1399 final boolean smResumptionPossible = isSmResumptionPossible(); 1400 // Don't throw a NotConnectedException is there is an resumable stream available 1401 if (!smResumptionPossible) { 1402 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1403 + " smResumptionPossible=" + smResumptionPossible); 1404 } 1405 } 1406 } 1407 1408 /** 1409 * Sends the specified element to the server. 1410 * 1411 * @param element the element to send. 1412 * @throws NotConnectedException 1413 * @throws InterruptedException 1414 */ 1415 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1416 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1417 try { 1418 queue.put(element); 1419 } 1420 catch (InterruptedException e) { 1421 // put() may throw an InterruptedException for two reasons: 1422 // 1. If the queue was shut down 1423 // 2. If the thread was interrupted 1424 // so we have to check which is the case 1425 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1426 // If the method above did not throw, then the sending thread was interrupted 1427 throw e; 1428 } 1429 } 1430 1431 /** 1432 * Shuts down the stanza writer. Once this method has been called, no further 1433 * packets will be written to the server. 1434 * @throws InterruptedException 1435 */ 1436 void shutdown(boolean instant) { 1437 instantShutdown = instant; 1438 queue.shutdown(); 1439 shutdownTimestamp = System.currentTimeMillis(); 1440 if (shutdownDone.isNotInInitialState()) { 1441 try { 1442 shutdownDone.checkIfSuccessOrWait(); 1443 } catch (NoResponseException | InterruptedException e) { 1444 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1445 } 1446 } 1447 } 1448 1449 /** 1450 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1451 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1452 * that case. 1453 * 1454 * @return the next element for writing or null. 1455 */ 1456 private Element nextStreamElement() { 1457 // It is important the we check if the queue is empty before removing an element from it 1458 if (queue.isEmpty()) { 1459 shouldBundleAndDefer = true; 1460 } 1461 Element packet = null; 1462 try { 1463 packet = queue.take(); 1464 } 1465 catch (InterruptedException e) { 1466 if (!queue.isShutdown()) { 1467 // Users shouldn't try to interrupt the packet writer thread 1468 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1469 } 1470 } 1471 return packet; 1472 } 1473 1474 private void writePackets() { 1475 Exception writerException = null; 1476 try { 1477 openStream(); 1478 initialOpenStreamSend.reportSuccess(); 1479 // Write out packets from the queue. 1480 while (!done()) { 1481 Element element = nextStreamElement(); 1482 if (element == null) { 1483 continue; 1484 } 1485 1486 // Get a local version of the bundle and defer callback, in case it's unset 1487 // between the null check and the method invocation 1488 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1489 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1490 // empty), then we could wait a bit for further stanzas attempting to decrease 1491 // our energy consumption 1492 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1493 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1494 // queue is empty again. 1495 shouldBundleAndDefer = false; 1496 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1497 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1498 bundlingAndDeferringStopped)); 1499 if (bundleAndDeferMillis > 0) { 1500 long remainingWait = bundleAndDeferMillis; 1501 final long waitStart = System.currentTimeMillis(); 1502 synchronized (bundlingAndDeferringStopped) { 1503 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1504 bundlingAndDeferringStopped.wait(remainingWait); 1505 remainingWait = bundleAndDeferMillis 1506 - (System.currentTimeMillis() - waitStart); 1507 } 1508 } 1509 } 1510 } 1511 1512 Stanza packet = null; 1513 if (element instanceof Stanza) { 1514 packet = (Stanza) element; 1515 } 1516 else if (element instanceof Enable) { 1517 // The client needs to add messages to the unacknowledged stanzas queue 1518 // right after it sent 'enabled'. Stanza will be added once 1519 // unacknowledgedStanzas is not null. 1520 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1521 } 1522 maybeAddToUnacknowledgedStanzas(packet); 1523 1524 CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE); 1525 if (elementXml instanceof XmlStringBuilder) { 1526 ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE); 1527 } 1528 else { 1529 writer.write(elementXml.toString()); 1530 } 1531 1532 if (queue.isEmpty()) { 1533 writer.flush(); 1534 } 1535 if (packet != null) { 1536 firePacketSendingListeners(packet); 1537 } 1538 } 1539 if (!instantShutdown) { 1540 // Flush out the rest of the queue. 1541 try { 1542 while (!queue.isEmpty()) { 1543 Element packet = queue.remove(); 1544 if (packet instanceof Stanza) { 1545 Stanza stanza = (Stanza) packet; 1546 maybeAddToUnacknowledgedStanzas(stanza); 1547 } 1548 writer.write(packet.toXML(null).toString()); 1549 } 1550 writer.flush(); 1551 } 1552 catch (Exception e) { 1553 LOGGER.log(Level.WARNING, 1554 "Exception flushing queue during shutdown, ignore and continue", 1555 e); 1556 } 1557 1558 // Close the stream. 1559 try { 1560 writer.write("</stream:stream>"); 1561 writer.flush(); 1562 } 1563 catch (Exception e) { 1564 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1565 } 1566 1567 // Delete the queue contents (hopefully nothing is left). 1568 queue.clear(); 1569 } else if (instantShutdown && isSmEnabled()) { 1570 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1571 // into the unacknowledgedStanzas queue 1572 drainWriterQueueToUnacknowledgedStanzas(); 1573 } 1574 // Do *not* close the writer here, as it will cause the socket 1575 // to get closed. But we may want to receive further stanzas 1576 // until the closing stream tag is received. The socket will be 1577 // closed in shutdown(). 1578 } 1579 catch (Exception e) { 1580 // The exception can be ignored if the the connection is 'done' 1581 // or if the it was caused because the socket got closed 1582 if (!(done() || queue.isShutdown())) { 1583 writerException = e; 1584 } else { 1585 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1586 } 1587 } finally { 1588 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1589 shutdownDone.reportSuccess(); 1590 } 1591 // Delay notifyConnectionError after shutdownDone has been reported in the finally block. 1592 if (writerException != null) { 1593 notifyConnectionError(writerException); 1594 } 1595 } 1596 1597 private void drainWriterQueueToUnacknowledgedStanzas() { 1598 List<Element> elements = new ArrayList<>(queue.size()); 1599 queue.drainTo(elements); 1600 for (int i = 0; i < elements.size(); i++) { 1601 Element element = elements.get(i); 1602 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1603 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1604 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1605 .newWith(i, elements, unacknowledgedStanzas); 1606 LOGGER.log(Level.WARNING, 1607 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1608 return; 1609 } 1610 if (element instanceof Stanza) { 1611 unacknowledgedStanzas.add((Stanza) element); 1612 } 1613 } 1614 } 1615 1616 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1617 // Check if the stream element should be put to the unacknowledgedStanza 1618 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1619 // packet order is not stable at this point (sendStanzaInternal() can be 1620 // called concurrently). 1621 if (unacknowledgedStanzas != null && stanza != null) { 1622 // If the unacknowledgedStanza queue is nearly full, request an new ack 1623 // from the server in order to drain it 1624 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1625 writer.write(AckRequest.INSTANCE.toXML(null).toString()); 1626 writer.flush(); 1627 } 1628 try { 1629 // It is important the we put the stanza in the unacknowledged stanza 1630 // queue before we put it on the wire 1631 unacknowledgedStanzas.put(stanza); 1632 } 1633 catch (InterruptedException e) { 1634 throw new IllegalStateException(e); 1635 } 1636 } 1637 } 1638 } 1639 1640 /** 1641 * Set if Stream Management should be used by default for new connections. 1642 * 1643 * @param useSmDefault true to use Stream Management for new connections. 1644 */ 1645 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1646 XMPPTCPConnection.useSmDefault = useSmDefault; 1647 } 1648 1649 /** 1650 * Set if Stream Management resumption should be used by default for new connections. 1651 * 1652 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1653 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1654 */ 1655 @Deprecated 1656 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1657 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1658 } 1659 1660 /** 1661 * Set if Stream Management resumption should be used by default for new connections. 1662 * 1663 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1664 */ 1665 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1666 if (useSmResumptionDefault) { 1667 // Also enable SM is resumption is enabled 1668 setUseStreamManagementDefault(useSmResumptionDefault); 1669 } 1670 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1671 } 1672 1673 /** 1674 * Set if Stream Management should be used if supported by the server. 1675 * 1676 * @param useSm true to use Stream Management. 1677 */ 1678 public void setUseStreamManagement(boolean useSm) { 1679 this.useSm = useSm; 1680 } 1681 1682 /** 1683 * Set if Stream Management resumption should be used if supported by the server. 1684 * 1685 * @param useSmResumption true to use Stream Management resumption. 1686 */ 1687 public void setUseStreamManagementResumption(boolean useSmResumption) { 1688 if (useSmResumption) { 1689 // Also enable SM is resumption is enabled 1690 setUseStreamManagement(useSmResumption); 1691 } 1692 this.useSmResumption = useSmResumption; 1693 } 1694 1695 /** 1696 * Set the preferred resumption time in seconds. 1697 * @param resumptionTime the preferred resumption time in seconds 1698 */ 1699 public void setPreferredResumptionTime(int resumptionTime) { 1700 smClientMaxResumptionTime = resumptionTime; 1701 } 1702 1703 /** 1704 * Add a predicate for Stream Management acknowledgment requests. 1705 * <p> 1706 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1707 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1708 * </p> 1709 * <p> 1710 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1711 * </p> 1712 * 1713 * @param predicate the predicate to add. 1714 * @return if the predicate was not already active. 1715 */ 1716 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1717 synchronized (requestAckPredicates) { 1718 return requestAckPredicates.add(predicate); 1719 } 1720 } 1721 1722 /** 1723 * Remove the given predicate for Stream Management acknowledgment request. 1724 * @param predicate the predicate to remove. 1725 * @return true if the predicate was removed. 1726 */ 1727 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1728 synchronized (requestAckPredicates) { 1729 return requestAckPredicates.remove(predicate); 1730 } 1731 } 1732 1733 /** 1734 * Remove all predicates for Stream Management acknowledgment requests. 1735 */ 1736 public void removeAllRequestAckPredicates() { 1737 synchronized (requestAckPredicates) { 1738 requestAckPredicates.clear(); 1739 } 1740 } 1741 1742 /** 1743 * Send an unconditional Stream Management acknowledgement request to the server. 1744 * 1745 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1746 * @throws NotConnectedException if the connection is not connected. 1747 * @throws InterruptedException 1748 */ 1749 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1750 if (!isSmEnabled()) { 1751 throw new StreamManagementException.StreamManagementNotEnabledException(); 1752 } 1753 requestSmAcknowledgementInternal(); 1754 } 1755 1756 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1757 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1758 } 1759 1760 /** 1761 * Send a unconditional Stream Management acknowledgment to the server. 1762 * <p> 1763 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1764 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1765 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1766 * </p> 1767 * 1768 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1769 * @throws NotConnectedException if the connection is not connected. 1770 * @throws InterruptedException 1771 */ 1772 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1773 if (!isSmEnabled()) { 1774 throw new StreamManagementException.StreamManagementNotEnabledException(); 1775 } 1776 sendSmAcknowledgementInternal(); 1777 } 1778 1779 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1780 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1781 } 1782 1783 /** 1784 * Add a Stanza acknowledged listener. 1785 * <p> 1786 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1787 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1788 * possible. 1789 * </p> 1790 * 1791 * @param listener the listener to add. 1792 */ 1793 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1794 stanzaAcknowledgedListeners.add(listener); 1795 } 1796 1797 /** 1798 * Remove the given Stanza acknowledged listener. 1799 * 1800 * @param listener the listener. 1801 * @return true if the listener was removed. 1802 */ 1803 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1804 return stanzaAcknowledgedListeners.remove(listener); 1805 } 1806 1807 /** 1808 * Remove all stanza acknowledged listeners. 1809 */ 1810 public void removeAllStanzaAcknowledgedListeners() { 1811 stanzaAcknowledgedListeners.clear(); 1812 } 1813 1814 /** 1815 * Add a Stanza dropped listener. 1816 * <p> 1817 * Those listeners will be invoked every time a Stanza has been dropped due to a failed SM resume. They will not get 1818 * automatically removed. If at least one StanzaDroppedListener is configured, no attempt will be made to retransmit 1819 * the Stanzas. 1820 * </p> 1821 * 1822 * @param listener the listener to add. 1823 * @since 4.3.3 1824 */ 1825 public void addStanzaDroppedListener(StanzaListener listener) { 1826 stanzaDroppedListeners.add(listener); 1827 } 1828 1829 /** 1830 * Remove the given Stanza dropped listener. 1831 * 1832 * @param listener the listener. 1833 * @return true if the listener was removed. 1834 * @since 4.3.3 1835 */ 1836 public boolean removeStanzaDroppedListener(StanzaListener listener) { 1837 return stanzaDroppedListeners.remove(listener); 1838 } 1839 1840 /** 1841 * Add a new Stanza ID acknowledged listener for the given ID. 1842 * <p> 1843 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1844 * automatically be removed after the listener was run. 1845 * </p> 1846 * 1847 * @param id the stanza ID. 1848 * @param listener the listener to invoke. 1849 * @return the previous listener for this stanza ID or null. 1850 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1851 */ 1852 @SuppressWarnings("FutureReturnValueIgnored") 1853 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1854 // Prevent users from adding callbacks that will never get removed 1855 if (!smWasEnabledAtLeastOnce) { 1856 throw new StreamManagementException.StreamManagementNotEnabledException(); 1857 } 1858 // Remove the listener after max. 3 hours 1859 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1860 schedule(new Runnable() { 1861 @Override 1862 public void run() { 1863 stanzaIdAcknowledgedListeners.remove(id); 1864 } 1865 }, removeAfterSeconds, TimeUnit.SECONDS); 1866 return stanzaIdAcknowledgedListeners.put(id, listener); 1867 } 1868 1869 /** 1870 * Remove the Stanza ID acknowledged listener for the given ID. 1871 * 1872 * @param id the stanza ID. 1873 * @return true if the listener was found and removed, false otherwise. 1874 */ 1875 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1876 return stanzaIdAcknowledgedListeners.remove(id); 1877 } 1878 1879 /** 1880 * Removes all Stanza ID acknowledged listeners. 1881 */ 1882 public void removeAllStanzaIdAcknowledgedListeners() { 1883 stanzaIdAcknowledgedListeners.clear(); 1884 } 1885 1886 /** 1887 * Returns true if Stream Management is supported by the server. 1888 * 1889 * @return true if Stream Management is supported by the server. 1890 */ 1891 public boolean isSmAvailable() { 1892 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1893 } 1894 1895 /** 1896 * Returns true if Stream Management was successfully negotiated with the server. 1897 * 1898 * @return true if Stream Management was negotiated. 1899 */ 1900 public boolean isSmEnabled() { 1901 return smEnabledSyncPoint.wasSuccessful(); 1902 } 1903 1904 /** 1905 * Returns true if the stream was successfully resumed with help of Stream Management. 1906 * 1907 * @return true if the stream was resumed. 1908 */ 1909 public boolean streamWasResumed() { 1910 return smResumedSyncPoint.wasSuccessful(); 1911 } 1912 1913 /** 1914 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1915 * 1916 * @return true if disconnected but resumption possible. 1917 */ 1918 public boolean isDisconnectedButSmResumptionPossible() { 1919 return disconnectedButResumeable && isSmResumptionPossible(); 1920 } 1921 1922 /** 1923 * Returns true if the stream is resumable. 1924 * 1925 * @return true if the stream is resumable. 1926 */ 1927 public boolean isSmResumptionPossible() { 1928 // There is no resumable stream available 1929 if (smSessionId == null) 1930 return false; 1931 1932 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1933 // Seems like we are already reconnected, report true 1934 if (shutdownTimestamp == null) { 1935 return true; 1936 } 1937 1938 // See if resumption time is over 1939 long current = System.currentTimeMillis(); 1940 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1941 if (current > shutdownTimestamp + maxResumptionMillies) { 1942 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1943 // resumption is possible 1944 return false; 1945 } else { 1946 return true; 1947 } 1948 } 1949 1950 /** 1951 * Drop the stream management state. Sets {@link #smSessionId} and 1952 * {@link #unacknowledgedStanzas} to <code>null</code>. 1953 */ 1954 private void dropSmState() { 1955 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1956 // respective. No need to reset them here. 1957 smSessionId = null; 1958 unacknowledgedStanzas = null; 1959 } 1960 1961 /** 1962 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1963 * <p> 1964 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1965 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1966 * without checking for overflows before. 1967 * </p> 1968 * 1969 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1970 */ 1971 public int getMaxSmResumptionTime() { 1972 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1973 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1974 return Math.min(clientResumptionTime, serverResumptionTime); 1975 } 1976 1977 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1978 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1979 final List<Stanza> ackedStanzas = new ArrayList<>( 1980 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1981 : Integer.MAX_VALUE); 1982 for (long i = 0; i < ackedStanzasCount; i++) { 1983 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1984 // If the server ack'ed a stanza, then it must be in the 1985 // unacknowledged stanza queue. There can be no exception. 1986 if (ackedStanza == null) { 1987 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1988 ackedStanzasCount, ackedStanzas); 1989 } 1990 ackedStanzas.add(ackedStanza); 1991 } 1992 1993 boolean atLeastOneStanzaAcknowledgedListener = false; 1994 if (!stanzaAcknowledgedListeners.isEmpty()) { 1995 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1996 atLeastOneStanzaAcknowledgedListener = true; 1997 } 1998 else { 1999 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 2000 for (Stanza ackedStanza : ackedStanzas) { 2001 String id = ackedStanza.getStanzaId(); 2002 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 2003 atLeastOneStanzaAcknowledgedListener = true; 2004 break; 2005 } 2006 } 2007 } 2008 2009 // Only spawn a new thread if there is a chance that some listener is invoked 2010 if (atLeastOneStanzaAcknowledgedListener) { 2011 asyncGo(new Runnable() { 2012 @Override 2013 public void run() { 2014 for (Stanza ackedStanza : ackedStanzas) { 2015 for (StanzaListener listener : stanzaAcknowledgedListeners) { 2016 try { 2017 listener.processStanza(ackedStanza); 2018 } 2019 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 2020 LOGGER.log(Level.FINER, "Received exception", e); 2021 } 2022 } 2023 String id = ackedStanza.getStanzaId(); 2024 if (StringUtils.isNullOrEmpty(id)) { 2025 continue; 2026 } 2027 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 2028 if (listener != null) { 2029 try { 2030 listener.processStanza(ackedStanza); 2031 } 2032 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 2033 LOGGER.log(Level.FINER, "Received exception", e); 2034 } 2035 } 2036 } 2037 } 2038 }); 2039 } 2040 2041 serverHandledStanzasCount = handledCount; 2042 } 2043 2044 /** 2045 * Set the default bundle and defer callback used for new connections. 2046 * 2047 * @param defaultBundleAndDeferCallback 2048 * @see BundleAndDeferCallback 2049 * @since 4.1 2050 */ 2051 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 2052 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 2053 } 2054 2055 /** 2056 * Set the bundle and defer callback used for this connection. 2057 * <p> 2058 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 2059 * no longer get deferred. 2060 * </p> 2061 * 2062 * @param bundleAndDeferCallback the callback or <code>null</code>. 2063 * @see BundleAndDeferCallback 2064 * @since 4.1 2065 */ 2066 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 2067 this.bundleAndDeferCallback = bundleAndDeferCallback; 2068 } 2069 2070}