001/** 002 * 003 * Copyright 2003-2007 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack.tcp; 018 019import java.io.BufferedReader; 020import java.io.ByteArrayInputStream; 021import java.io.FileInputStream; 022import java.io.IOException; 023import java.io.InputStream; 024import java.io.InputStreamReader; 025import java.io.OutputStream; 026import java.io.OutputStreamWriter; 027import java.io.Writer; 028import java.lang.reflect.Constructor; 029import java.net.InetAddress; 030import java.net.InetSocketAddress; 031import java.net.Socket; 032import java.security.KeyManagementException; 033import java.security.KeyStore; 034import java.security.KeyStoreException; 035import java.security.NoSuchAlgorithmException; 036import java.security.NoSuchProviderException; 037import java.security.Provider; 038import java.security.SecureRandom; 039import java.security.Security; 040import java.security.UnrecoverableKeyException; 041import java.security.cert.CertificateException; 042import java.util.ArrayList; 043import java.util.Collection; 044import java.util.Iterator; 045import java.util.LinkedHashSet; 046import java.util.LinkedList; 047import java.util.List; 048import java.util.Map; 049import java.util.Set; 050import java.util.concurrent.ArrayBlockingQueue; 051import java.util.concurrent.BlockingQueue; 052import java.util.concurrent.ConcurrentHashMap; 053import java.util.concurrent.ConcurrentLinkedQueue; 054import java.util.concurrent.Semaphore; 055import java.util.concurrent.TimeUnit; 056import java.util.concurrent.atomic.AtomicBoolean; 057import java.util.logging.Level; 058import java.util.logging.Logger; 059 060import javax.net.SocketFactory; 061import javax.net.ssl.HostnameVerifier; 062import javax.net.ssl.KeyManager; 063import javax.net.ssl.KeyManagerFactory; 064import javax.net.ssl.SSLContext; 065import javax.net.ssl.SSLSession; 066import javax.net.ssl.SSLSocket; 067import javax.net.ssl.TrustManager; 068import javax.net.ssl.X509TrustManager; 069import javax.security.auth.callback.Callback; 070import javax.security.auth.callback.CallbackHandler; 071import javax.security.auth.callback.PasswordCallback; 072 073import org.jivesoftware.smack.AbstractConnectionListener; 074import org.jivesoftware.smack.AbstractXMPPConnection; 075import org.jivesoftware.smack.ConnectionConfiguration; 076import org.jivesoftware.smack.ConnectionConfiguration.DnssecMode; 077import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 078import org.jivesoftware.smack.SmackConfiguration; 079import org.jivesoftware.smack.SmackException; 080import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 081import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 082import org.jivesoftware.smack.SmackException.ConnectionException; 083import org.jivesoftware.smack.SmackException.NoResponseException; 084import org.jivesoftware.smack.SmackException.NotConnectedException; 085import org.jivesoftware.smack.SmackException.NotLoggedInException; 086import org.jivesoftware.smack.SmackException.SecurityRequiredByServerException; 087import org.jivesoftware.smack.SmackException.SmackWrappedException; 088import org.jivesoftware.smack.SmackFuture; 089import org.jivesoftware.smack.StanzaListener; 090import org.jivesoftware.smack.SynchronizationPoint; 091import org.jivesoftware.smack.XMPPConnection; 092import org.jivesoftware.smack.XMPPException; 093import org.jivesoftware.smack.XMPPException.FailedNonzaException; 094import org.jivesoftware.smack.XMPPException.StreamErrorException; 095import org.jivesoftware.smack.compress.packet.Compress; 096import org.jivesoftware.smack.compress.packet.Compressed; 097import org.jivesoftware.smack.compression.XMPPInputOutputStream; 098import org.jivesoftware.smack.filter.StanzaFilter; 099import org.jivesoftware.smack.packet.Element; 100import org.jivesoftware.smack.packet.IQ; 101import org.jivesoftware.smack.packet.Message; 102import org.jivesoftware.smack.packet.Nonza; 103import org.jivesoftware.smack.packet.Presence; 104import org.jivesoftware.smack.packet.Stanza; 105import org.jivesoftware.smack.packet.StartTls; 106import org.jivesoftware.smack.packet.StreamError; 107import org.jivesoftware.smack.packet.StreamOpen; 108import org.jivesoftware.smack.proxy.ProxyInfo; 109import org.jivesoftware.smack.sasl.packet.SaslStreamElements; 110import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Challenge; 111import org.jivesoftware.smack.sasl.packet.SaslStreamElements.SASLFailure; 112import org.jivesoftware.smack.sasl.packet.SaslStreamElements.Success; 113import org.jivesoftware.smack.sm.SMUtils; 114import org.jivesoftware.smack.sm.StreamManagementException; 115import org.jivesoftware.smack.sm.StreamManagementException.StreamIdDoesNotMatchException; 116import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementCounterError; 117import org.jivesoftware.smack.sm.StreamManagementException.StreamManagementNotEnabledException; 118import org.jivesoftware.smack.sm.packet.StreamManagement; 119import org.jivesoftware.smack.sm.packet.StreamManagement.AckAnswer; 120import org.jivesoftware.smack.sm.packet.StreamManagement.AckRequest; 121import org.jivesoftware.smack.sm.packet.StreamManagement.Enable; 122import org.jivesoftware.smack.sm.packet.StreamManagement.Enabled; 123import org.jivesoftware.smack.sm.packet.StreamManagement.Failed; 124import org.jivesoftware.smack.sm.packet.StreamManagement.Resume; 125import org.jivesoftware.smack.sm.packet.StreamManagement.Resumed; 126import org.jivesoftware.smack.sm.packet.StreamManagement.StreamManagementFeature; 127import org.jivesoftware.smack.sm.predicates.Predicate; 128import org.jivesoftware.smack.sm.provider.ParseStreamManagement; 129import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 130import org.jivesoftware.smack.util.Async; 131import org.jivesoftware.smack.util.DNSUtil; 132import org.jivesoftware.smack.util.PacketParserUtils; 133import org.jivesoftware.smack.util.StringUtils; 134import org.jivesoftware.smack.util.TLSUtils; 135import org.jivesoftware.smack.util.XmlStringBuilder; 136import org.jivesoftware.smack.util.dns.HostAddress; 137import org.jivesoftware.smack.util.dns.SmackDaneProvider; 138import org.jivesoftware.smack.util.dns.SmackDaneVerifier; 139 140import org.jxmpp.jid.impl.JidCreate; 141import org.jxmpp.jid.parts.Resourcepart; 142import org.jxmpp.stringprep.XmppStringprepException; 143import org.jxmpp.util.XmppStringUtils; 144import org.xmlpull.v1.XmlPullParser; 145import org.xmlpull.v1.XmlPullParserException; 146 147/** 148 * Creates a socket connection to an XMPP server. This is the default connection 149 * to an XMPP server and is specified in the XMPP Core (RFC 6120). 150 * 151 * @see XMPPConnection 152 * @author Matt Tucker 153 */ 154public class XMPPTCPConnection extends AbstractXMPPConnection { 155 156 private static final int QUEUE_SIZE = 500; 157 private static final Logger LOGGER = Logger.getLogger(XMPPTCPConnection.class.getName()); 158 159 /** 160 * The socket which is used for this connection. 161 */ 162 private Socket socket; 163 164 /** 165 * 166 */ 167 private boolean disconnectedButResumeable = false; 168 169 private SSLSocket secureSocket; 170 171 private final Semaphore readerWriterSemaphore = new Semaphore(2); 172 173 /** 174 * Protected access level because of unit test purposes 175 */ 176 protected final PacketWriter packetWriter = new PacketWriter(); 177 178 /** 179 * Protected access level because of unit test purposes 180 */ 181 protected final PacketReader packetReader = new PacketReader(); 182 183 private final SynchronizationPoint<Exception> initialOpenStreamSend = new SynchronizationPoint<>( 184 this, "initial open stream element send to server"); 185 186 /** 187 * 188 */ 189 private final SynchronizationPoint<XMPPException> maybeCompressFeaturesReceived = new SynchronizationPoint<XMPPException>( 190 this, "stream compression feature"); 191 192 /** 193 * 194 */ 195 private final SynchronizationPoint<SmackException> compressSyncPoint = new SynchronizationPoint<>( 196 this, "stream compression"); 197 198 /** 199 * A synchronization point which is successful if this connection has received the closing 200 * stream element from the remote end-point, i.e. the server. 201 */ 202 private final SynchronizationPoint<Exception> closingStreamReceived = new SynchronizationPoint<>( 203 this, "stream closing element received"); 204 205 /** 206 * The default bundle and defer callback, used for new connections. 207 * @see bundleAndDeferCallback 208 */ 209 private static BundleAndDeferCallback defaultBundleAndDeferCallback; 210 211 /** 212 * The used bundle and defer callback. 213 * <p> 214 * Although this field may be set concurrently, the 'volatile' keyword was deliberately not added, in order to avoid 215 * having a 'volatile' read within the writer threads loop. 216 * </p> 217 */ 218 private BundleAndDeferCallback bundleAndDeferCallback = defaultBundleAndDeferCallback; 219 220 private static boolean useSmDefault = true; 221 222 private static boolean useSmResumptionDefault = true; 223 224 /** 225 * The stream ID of the stream that is currently resumable, ie. the stream we hold the state 226 * for in {@link #clientHandledStanzasCount}, {@link #serverHandledStanzasCount} and 227 * {@link #unacknowledgedStanzas}. 228 */ 229 private String smSessionId; 230 231 private final SynchronizationPoint<FailedNonzaException> smResumedSyncPoint = new SynchronizationPoint<>( 232 this, "stream resumed element"); 233 234 private final SynchronizationPoint<SmackException> smEnabledSyncPoint = new SynchronizationPoint<>( 235 this, "stream enabled element"); 236 237 /** 238 * The client's preferred maximum resumption time in seconds. 239 */ 240 private int smClientMaxResumptionTime = -1; 241 242 /** 243 * The server's preferred maximum resumption time in seconds. 244 */ 245 private int smServerMaxResumptionTime = -1; 246 247 /** 248 * Indicates whether Stream Management (XEP-198) should be used if it's supported by the server. 249 */ 250 private boolean useSm = useSmDefault; 251 private boolean useSmResumption = useSmResumptionDefault; 252 253 /** 254 * The counter that the server sends the client about it's current height. For example, if the server sends 255 * {@code <a h='42'/>}, then this will be set to 42 (while also handling the {@link #unacknowledgedStanzas} queue). 256 */ 257 private long serverHandledStanzasCount = 0; 258 259 /** 260 * The counter for stanzas handled ("received") by the client. 261 * <p> 262 * Note that we don't need to synchronize this counter. Although JLS 17.7 states that reads and writes to longs are 263 * not atomic, it guarantees that there are at most 2 separate writes, one to each 32-bit half. And since 264 * {@link SMUtils#incrementHeight(long)} masks the lower 32 bit, we only operate on one half of the long and 265 * therefore have no concurrency problem because the read/write operations on one half are guaranteed to be atomic. 266 * </p> 267 */ 268 private long clientHandledStanzasCount = 0; 269 270 private BlockingQueue<Stanza> unacknowledgedStanzas; 271 272 /** 273 * Set to true if Stream Management was at least once enabled for this connection. 274 */ 275 private boolean smWasEnabledAtLeastOnce = false; 276 277 /** 278 * This listeners are invoked for every stanza that got acknowledged. 279 * <p> 280 * We use a {@link ConcurrentLinkedQueue} here in order to allow the listeners to remove 281 * themselves after they have been invoked. 282 * </p> 283 */ 284 private final Collection<StanzaListener> stanzaAcknowledgedListeners = new ConcurrentLinkedQueue<>(); 285 286 /** 287 * This listeners are invoked for a acknowledged stanza that has the given stanza ID. They will 288 * only be invoked once and automatically removed after that. 289 */ 290 private final Map<String, StanzaListener> stanzaIdAcknowledgedListeners = new ConcurrentHashMap<>(); 291 292 /** 293 * Predicates that determine if an stream management ack should be requested from the server. 294 * <p> 295 * We use a linked hash set here, so that the order how the predicates are added matches the 296 * order in which they are invoked in order to determine if an ack request should be send or not. 297 * </p> 298 */ 299 private final Set<StanzaFilter> requestAckPredicates = new LinkedHashSet<>(); 300 301 @SuppressWarnings("HidingField") 302 private final XMPPTCPConnectionConfiguration config; 303 304 /** 305 * Creates a new XMPP connection over TCP (optionally using proxies). 306 * <p> 307 * Note that XMPPTCPConnection constructors do not establish a connection to the server 308 * and you must call {@link #connect()}. 309 * </p> 310 * 311 * @param config the connection configuration. 312 */ 313 public XMPPTCPConnection(XMPPTCPConnectionConfiguration config) { 314 super(config); 315 this.config = config; 316 addConnectionListener(new AbstractConnectionListener() { 317 @Override 318 public void connectionClosedOnError(Exception e) { 319 if (e instanceof XMPPException.StreamErrorException || e instanceof StreamManagementException) { 320 dropSmState(); 321 } 322 } 323 }); 324 } 325 326 /** 327 * Creates a new XMPP connection over TCP. 328 * <p> 329 * Note that {@code jid} must be the bare JID, e.g. "user@example.org". More fine-grained control over the 330 * connection settings is available using the {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} 331 * constructor. 332 * </p> 333 * 334 * @param jid the bare JID used by the client. 335 * @param password the password or authentication token. 336 * @throws XmppStringprepException 337 */ 338 public XMPPTCPConnection(CharSequence jid, String password) throws XmppStringprepException { 339 this(XmppStringUtils.parseLocalpart(jid.toString()), password, XmppStringUtils.parseDomain(jid.toString())); 340 } 341 342 /** 343 * Creates a new XMPP connection over TCP. 344 * <p> 345 * This is the simplest constructor for connecting to an XMPP server. Alternatively, 346 * you can get fine-grained control over connection settings using the 347 * {@link #XMPPTCPConnection(XMPPTCPConnectionConfiguration)} constructor. 348 * </p> 349 * @param username 350 * @param password 351 * @param serviceName 352 * @throws XmppStringprepException 353 */ 354 public XMPPTCPConnection(CharSequence username, String password, String serviceName) throws XmppStringprepException { 355 this(XMPPTCPConnectionConfiguration.builder().setUsernameAndPassword(username, password).setXmppDomain( 356 JidCreate.domainBareFrom(serviceName)).build()); 357 } 358 359 @Override 360 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 361 if (packetWriter == null) { 362 throw new NotConnectedException(); 363 } 364 packetWriter.throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 365 } 366 367 @Override 368 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 369 if (isConnected() && !disconnectedButResumeable) { 370 throw new AlreadyConnectedException(); 371 } 372 } 373 374 @Override 375 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 376 if (isAuthenticated() && !disconnectedButResumeable) { 377 throw new AlreadyLoggedInException(); 378 } 379 } 380 381 @Override 382 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 383 // Reset the flag in case it was set 384 disconnectedButResumeable = false; 385 super.afterSuccessfulLogin(resumed); 386 } 387 388 @Override 389 protected synchronized void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 390 SmackException, IOException, InterruptedException { 391 // Authenticate using SASL 392 SSLSession sslSession = secureSocket != null ? secureSocket.getSession() : null; 393 saslAuthentication.authenticate(username, password, config.getAuthzid(), sslSession); 394 395 // Wait for stream features after the authentication. 396 // TODO: The name of this synchronization point "maybeCompressFeaturesReceived" is not perfect. It should be 397 // renamed to "streamFeaturesAfterAuthenticationReceived". 398 maybeCompressFeaturesReceived.checkIfSuccessOrWait(); 399 400 // If compression is enabled then request the server to use stream compression. XEP-170 401 // recommends to perform stream compression before resource binding. 402 maybeEnableCompression(); 403 404 if (isSmResumptionPossible()) { 405 smResumedSyncPoint.sendAndWaitForResponse(new Resume(clientHandledStanzasCount, smSessionId)); 406 if (smResumedSyncPoint.wasSuccessful()) { 407 // We successfully resumed the stream, be done here 408 afterSuccessfulLogin(true); 409 return; 410 } 411 // SM resumption failed, what Smack does here is to report success of 412 // lastFeaturesReceived in case of sm resumption was answered with 'failed' so that 413 // normal resource binding can be tried. 414 LOGGER.fine("Stream resumption failed, continuing with normal stream establishment process"); 415 } 416 417 List<Stanza> previouslyUnackedStanzas = new LinkedList<Stanza>(); 418 if (unacknowledgedStanzas != null) { 419 // There was a previous connection with SM enabled but that was either not resumable or 420 // failed to resume. Make sure that we (re-)send the unacknowledged stanzas. 421 unacknowledgedStanzas.drainTo(previouslyUnackedStanzas); 422 // Reset unacknowledged stanzas to 'null' to signal that we never send 'enable' in this 423 // XMPP session (There maybe was an enabled in a previous XMPP session of this 424 // connection instance though). This is used in writePackets to decide if stanzas should 425 // be added to the unacknowledged stanzas queue, because they have to be added right 426 // after the 'enable' stream element has been sent. 427 dropSmState(); 428 } 429 430 // Now bind the resource. It is important to do this *after* we dropped an eventually 431 // existing Stream Management state. As otherwise <bind/> and <session/> may end up in 432 // unacknowledgedStanzas and become duplicated on reconnect. See SMACK-706. 433 bindResourceAndEstablishSession(resource); 434 435 if (isSmAvailable() && useSm) { 436 // Remove what is maybe left from previously stream managed sessions 437 serverHandledStanzasCount = 0; 438 // XEP-198 3. Enabling Stream Management. If the server response to 'Enable' is 'Failed' 439 // then this is a non recoverable error and we therefore throw an exception. 440 smEnabledSyncPoint.sendAndWaitForResponseOrThrow(new Enable(useSmResumption, smClientMaxResumptionTime)); 441 synchronized (requestAckPredicates) { 442 if (requestAckPredicates.isEmpty()) { 443 // Assure that we have at lest one predicate set up that so that we request acks 444 // for the server and eventually flush some stanzas from the unacknowledged 445 // stanza queue 446 requestAckPredicates.add(Predicate.forMessagesOrAfter5Stanzas()); 447 } 448 } 449 } 450 // (Re-)send the stanzas *after* we tried to enable SM 451 for (Stanza stanza : previouslyUnackedStanzas) { 452 sendStanzaInternal(stanza); 453 } 454 455 afterSuccessfulLogin(false); 456 } 457 458 @Override 459 public boolean isSecureConnection() { 460 return secureSocket != null; 461 } 462 463 /** 464 * Shuts the current connection down. After this method returns, the connection must be ready 465 * for re-use by connect. 466 */ 467 @Override 468 protected void shutdown() { 469 if (isSmEnabled()) { 470 try { 471 // Try to send a last SM Acknowledgement. Most servers won't find this information helpful, as the SM 472 // state is dropped after a clean disconnect anyways. OTOH it doesn't hurt much either. 473 sendSmAcknowledgementInternal(); 474 } catch (InterruptedException | NotConnectedException e) { 475 LOGGER.log(Level.FINE, "Can not send final SM ack as connection is not connected", e); 476 } 477 } 478 shutdown(false); 479 } 480 481 /** 482 * Performs an unclean disconnect and shutdown of the connection. Does not send a closing stream stanza. 483 */ 484 public synchronized void instantShutdown() { 485 shutdown(true); 486 } 487 488 private void shutdown(boolean instant) { 489 if (disconnectedButResumeable) { 490 return; 491 } 492 493 // First shutdown the writer, this will result in a closing stream element getting send to 494 // the server 495 LOGGER.finer("PacketWriter shutdown()"); 496 packetWriter.shutdown(instant); 497 LOGGER.finer("PacketWriter has been shut down"); 498 499 if (!instant) { 500 try { 501 // After we send the closing stream element, check if there was already a 502 // closing stream element sent by the server or wait with a timeout for a 503 // closing stream element to be received from the server. 504 @SuppressWarnings("unused") 505 Exception res = closingStreamReceived.checkIfSuccessOrWait(); 506 } catch (InterruptedException | NoResponseException e) { 507 LOGGER.log(Level.INFO, "Exception while waiting for closing stream element from the server " + this, e); 508 } 509 } 510 511 LOGGER.finer("PacketReader shutdown()"); 512 packetReader.shutdown(); 513 LOGGER.finer("PacketReader has been shut down"); 514 515 try { 516 socket.close(); 517 } catch (Exception e) { 518 LOGGER.log(Level.WARNING, "shutdown", e); 519 } 520 521 setWasAuthenticated(); 522 // If we are able to resume the stream, then don't set 523 // connected/authenticated/usingTLS to false since we like behave like we are still 524 // connected (e.g. sendStanza should not throw a NotConnectedException). 525 if (isSmResumptionPossible() && instant) { 526 disconnectedButResumeable = true; 527 } else { 528 disconnectedButResumeable = false; 529 // Reset the stream management session id to null, since if the stream is cleanly closed, i.e. sending a closing 530 // stream tag, there is no longer a stream to resume. 531 smSessionId = null; 532 } 533 authenticated = false; 534 connected = false; 535 secureSocket = null; 536 reader = null; 537 writer = null; 538 539 maybeCompressFeaturesReceived.init(); 540 compressSyncPoint.init(); 541 smResumedSyncPoint.init(); 542 smEnabledSyncPoint.init(); 543 initialOpenStreamSend.init(); 544 } 545 546 @Override 547 public void sendNonza(Nonza element) throws NotConnectedException, InterruptedException { 548 packetWriter.sendStreamElement(element); 549 } 550 551 @Override 552 protected void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException { 553 packetWriter.sendStreamElement(packet); 554 if (isSmEnabled()) { 555 for (StanzaFilter requestAckPredicate : requestAckPredicates) { 556 if (requestAckPredicate.accept(packet)) { 557 requestSmAcknowledgementInternal(); 558 break; 559 } 560 } 561 } 562 } 563 564 private void connectUsingConfiguration() throws ConnectionException, IOException, InterruptedException { 565 List<HostAddress> failedAddresses = populateHostAddresses(); 566 SocketFactory socketFactory = config.getSocketFactory(); 567 ProxyInfo proxyInfo = config.getProxyInfo(); 568 int timeout = config.getConnectTimeout(); 569 if (socketFactory == null) { 570 socketFactory = SocketFactory.getDefault(); 571 } 572 for (HostAddress hostAddress : hostAddresses) { 573 Iterator<InetAddress> inetAddresses; 574 String host = hostAddress.getHost(); 575 int port = hostAddress.getPort(); 576 if (proxyInfo == null) { 577 inetAddresses = hostAddress.getInetAddresses().iterator(); 578 assert (inetAddresses.hasNext()); 579 580 innerloop: while (inetAddresses.hasNext()) { 581 // Create a *new* Socket before every connection attempt, i.e. connect() call, since Sockets are not 582 // re-usable after a failed connection attempt. See also SMACK-724. 583 SmackFuture.SocketFuture socketFuture = new SmackFuture.SocketFuture(socketFactory); 584 585 final InetAddress inetAddress = inetAddresses.next(); 586 final InetSocketAddress inetSocketAddress = new InetSocketAddress(inetAddress, port); 587 LOGGER.finer("Trying to establish TCP connection to " + inetSocketAddress); 588 socketFuture.connectAsync(inetSocketAddress, timeout); 589 590 try { 591 socket = socketFuture.getOrThrow(); 592 } catch (IOException e) { 593 hostAddress.setException(inetAddress, e); 594 if (inetAddresses.hasNext()) { 595 continue innerloop; 596 } else { 597 break innerloop; 598 } 599 } 600 LOGGER.finer("Established TCP connection to " + inetSocketAddress); 601 // We found a host to connect to, return here 602 this.host = host; 603 this.port = port; 604 return; 605 } 606 failedAddresses.add(hostAddress); 607 } else { 608 socket = socketFactory.createSocket(); 609 StringUtils.requireNotNullOrEmpty(host, "Host of HostAddress " + hostAddress + " must not be null when using a Proxy"); 610 final String hostAndPort = host + " at port " + port; 611 LOGGER.finer("Trying to establish TCP connection via Proxy to " + hostAndPort); 612 try { 613 proxyInfo.getProxySocketConnection().connect(socket, host, port, timeout); 614 } catch (IOException e) { 615 hostAddress.setException(e); 616 continue; 617 } 618 LOGGER.finer("Established TCP connection to " + hostAndPort); 619 // We found a host to connect to, return here 620 this.host = host; 621 this.port = port; 622 return; 623 } 624 } 625 // There are no more host addresses to try 626 // throw an exception and report all tried 627 // HostAddresses in the exception 628 throw ConnectionException.from(failedAddresses); 629 } 630 631 /** 632 * Initializes the connection by creating a stanza reader and writer and opening a 633 * XMPP stream to the server. 634 * 635 * @throws XMPPException if establishing a connection to the server fails. 636 * @throws SmackException if the server fails to respond back or if there is anther error. 637 * @throws IOException 638 * @throws InterruptedException 639 */ 640 private void initConnection() throws IOException, InterruptedException { 641 compressionHandler = null; 642 643 // Set the reader and writer instance variables 644 initReaderAndWriter(); 645 646 int availableReaderWriterSemaphorePermits = readerWriterSemaphore.availablePermits(); 647 if (availableReaderWriterSemaphorePermits < 2) { 648 Object[] logObjects = new Object[] { 649 this, 650 availableReaderWriterSemaphorePermits, 651 }; 652 LOGGER.log(Level.FINE, "Not every reader/writer threads where terminated on connection re-initializtion of {0}. Available permits {1}", logObjects); 653 } 654 readerWriterSemaphore.acquire(2); 655 // Start the writer thread. This will open an XMPP stream to the server 656 packetWriter.init(); 657 // Start the reader thread. The startup() method will block until we 658 // get an opening stream packet back from server 659 packetReader.init(); 660 } 661 662 private void initReaderAndWriter() throws IOException { 663 InputStream is = socket.getInputStream(); 664 OutputStream os = socket.getOutputStream(); 665 if (compressionHandler != null) { 666 is = compressionHandler.getInputStream(is); 667 os = compressionHandler.getOutputStream(os); 668 } 669 // OutputStreamWriter is already buffered, no need to wrap it into a BufferedWriter 670 writer = new OutputStreamWriter(os, "UTF-8"); 671 reader = new BufferedReader(new InputStreamReader(is, "UTF-8")); 672 673 // If debugging is enabled, we open a window and write out all network traffic. 674 initDebugger(); 675 } 676 677 /** 678 * The server has indicated that TLS negotiation can start. We now need to secure the 679 * existing plain connection and perform a handshake. This method won't return until the 680 * connection has finished the handshake or an error occurred while securing the connection. 681 * @throws IOException 682 * @throws CertificateException 683 * @throws NoSuchAlgorithmException 684 * @throws NoSuchProviderException 685 * @throws KeyStoreException 686 * @throws UnrecoverableKeyException 687 * @throws KeyManagementException 688 * @throws SmackException 689 * @throws Exception if an exception occurs. 690 */ 691 @SuppressWarnings("LiteralClassName") 692 private void proceedTLSReceived() throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException, NoSuchProviderException, UnrecoverableKeyException, KeyManagementException, SmackException { 693 SmackDaneVerifier daneVerifier = null; 694 695 if (config.getDnssecMode() == DnssecMode.needsDnssecAndDane) { 696 SmackDaneProvider daneProvider = DNSUtil.getDaneProvider(); 697 if (daneProvider == null) { 698 throw new UnsupportedOperationException("DANE enabled but no SmackDaneProvider configured"); 699 } 700 daneVerifier = daneProvider.newInstance(); 701 if (daneVerifier == null) { 702 throw new IllegalStateException("DANE requested but DANE provider did not return a DANE verifier"); 703 } 704 } 705 706 SSLContext context = this.config.getCustomSSLContext(); 707 KeyStore ks = null; 708 PasswordCallback pcb = null; 709 710 if (context == null) { 711 final String keyStoreType = config.getKeystoreType(); 712 final CallbackHandler callbackHandler = config.getCallbackHandler(); 713 final String keystorePath = config.getKeystorePath(); 714 if ("PKCS11".equals(keyStoreType)) { 715 try { 716 Constructor<?> c = Class.forName("sun.security.pkcs11.SunPKCS11").getConstructor(InputStream.class); 717 String pkcs11Config = "name = SmartCard\nlibrary = " + config.getPKCS11Library(); 718 ByteArrayInputStream config = new ByteArrayInputStream(pkcs11Config.getBytes(StringUtils.UTF8)); 719 Provider p = (Provider) c.newInstance(config); 720 Security.addProvider(p); 721 ks = KeyStore.getInstance("PKCS11",p); 722 pcb = new PasswordCallback("PKCS11 Password: ",false); 723 callbackHandler.handle(new Callback[] {pcb}); 724 ks.load(null,pcb.getPassword()); 725 } 726 catch (Exception e) { 727 LOGGER.log(Level.WARNING, "Exception", e); 728 ks = null; 729 } 730 } 731 else if ("Apple".equals(keyStoreType)) { 732 ks = KeyStore.getInstance("KeychainStore","Apple"); 733 ks.load(null,null); 734 // pcb = new PasswordCallback("Apple Keychain",false); 735 // pcb.setPassword(null); 736 } 737 else if (keyStoreType != null) { 738 ks = KeyStore.getInstance(keyStoreType); 739 if (callbackHandler != null && StringUtils.isNotEmpty(keystorePath)) { 740 try { 741 pcb = new PasswordCallback("Keystore Password: ", false); 742 callbackHandler.handle(new Callback[] { pcb }); 743 ks.load(new FileInputStream(keystorePath), pcb.getPassword()); 744 } 745 catch (Exception e) { 746 LOGGER.log(Level.WARNING, "Exception", e); 747 ks = null; 748 } 749 } else { 750 ks.load(null, null); 751 } 752 } 753 754 KeyManager[] kms = null; 755 756 if (ks != null) { 757 String keyManagerFactoryAlgorithm = KeyManagerFactory.getDefaultAlgorithm(); 758 KeyManagerFactory kmf = null; 759 try { 760 kmf = KeyManagerFactory.getInstance(keyManagerFactoryAlgorithm); 761 } 762 catch (NoSuchAlgorithmException e) { 763 LOGGER.log(Level.FINE, "Could get the default KeyManagerFactory for the '" 764 + keyManagerFactoryAlgorithm + "' algorithm", e); 765 } 766 if (kmf != null) { 767 try { 768 if (pcb == null) { 769 kmf.init(ks, null); 770 } 771 else { 772 kmf.init(ks, pcb.getPassword()); 773 pcb.clearPassword(); 774 } 775 kms = kmf.getKeyManagers(); 776 } 777 catch (NullPointerException npe) { 778 LOGGER.log(Level.WARNING, "NullPointerException", npe); 779 } 780 } 781 } 782 783 // If the user didn't specify a SSLContext, use the default one 784 context = SSLContext.getInstance("TLS"); 785 786 final SecureRandom secureRandom = new java.security.SecureRandom(); 787 X509TrustManager customTrustManager = config.getCustomX509TrustManager(); 788 789 if (daneVerifier != null) { 790 // User requested DANE verification. 791 daneVerifier.init(context, kms, customTrustManager, secureRandom); 792 } else { 793 TrustManager[] customTrustManagers = null; 794 if (customTrustManager != null) { 795 customTrustManagers = new TrustManager[] { customTrustManager }; 796 } 797 context.init(kms, customTrustManagers, secureRandom); 798 } 799 } 800 801 Socket plain = socket; 802 // Secure the plain connection 803 socket = context.getSocketFactory().createSocket(plain, 804 config.getXMPPServiceDomain().toString(), plain.getPort(), true); 805 806 final SSLSocket sslSocket = (SSLSocket) socket; 807 // Immediately set the enabled SSL protocols and ciphers. See SMACK-712 why this is 808 // important (at least on certain platforms) and it seems to be a good idea anyways to 809 // prevent an accidental implicit handshake. 810 TLSUtils.setEnabledProtocolsAndCiphers(sslSocket, config.getEnabledSSLProtocols(), config.getEnabledSSLCiphers()); 811 812 // Initialize the reader and writer with the new secured version 813 initReaderAndWriter(); 814 815 // Proceed to do the handshake 816 sslSocket.startHandshake(); 817 818 if (daneVerifier != null) { 819 daneVerifier.finish(sslSocket); 820 } 821 822 final HostnameVerifier verifier = getConfiguration().getHostnameVerifier(); 823 if (verifier == null) { 824 throw new IllegalStateException("No HostnameVerifier set. Use connectionConfiguration.setHostnameVerifier() to configure."); 825 } else if (!verifier.verify(getXMPPServiceDomain().toString(), sslSocket.getSession())) { 826 throw new CertificateException("Hostname verification of certificate failed. Certificate does not authenticate " + getXMPPServiceDomain()); 827 } 828 829 // Set that TLS was successful 830 secureSocket = sslSocket; 831 } 832 833 /** 834 * Returns the compression handler that can be used for one compression methods offered by the server. 835 * 836 * @return a instance of XMPPInputOutputStream or null if no suitable instance was found 837 * 838 */ 839 private static XMPPInputOutputStream maybeGetCompressionHandler(Compress.Feature compression) { 840 for (XMPPInputOutputStream handler : SmackConfiguration.getCompressionHandlers()) { 841 String method = handler.getCompressionMethod(); 842 if (compression.getMethods().contains(method)) 843 return handler; 844 } 845 return null; 846 } 847 848 @Override 849 public boolean isUsingCompression() { 850 return compressionHandler != null && compressSyncPoint.wasSuccessful(); 851 } 852 853 /** 854 * <p> 855 * Starts using stream compression that will compress network traffic. Traffic can be 856 * reduced up to 90%. Therefore, stream compression is ideal when using a slow speed network 857 * connection. However, the server and the client will need to use more CPU time in order to 858 * un/compress network data so under high load the server performance might be affected. 859 * </p> 860 * <p> 861 * Stream compression has to have been previously offered by the server. Currently only the 862 * zlib method is supported by the client. Stream compression negotiation has to be done 863 * before authentication took place. 864 * </p> 865 * 866 * @throws NotConnectedException 867 * @throws SmackException 868 * @throws NoResponseException 869 * @throws InterruptedException 870 */ 871 private void maybeEnableCompression() throws SmackException, InterruptedException { 872 if (!config.isCompressionEnabled()) { 873 return; 874 } 875 876 Compress.Feature compression = getFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE); 877 if (compression == null) { 878 // Server does not support compression 879 return; 880 } 881 // If stream compression was offered by the server and we want to use 882 // compression then send compression request to the server 883 if ((compressionHandler = maybeGetCompressionHandler(compression)) != null) { 884 compressSyncPoint.sendAndWaitForResponseOrThrow(new Compress(compressionHandler.getCompressionMethod())); 885 } else { 886 LOGGER.warning("Could not enable compression because no matching handler/method pair was found"); 887 } 888 } 889 890 /** 891 * Establishes a connection to the XMPP server. It basically 892 * creates and maintains a socket connection to the server. 893 * <p> 894 * Listeners will be preserved from a previous connection if the reconnection 895 * occurs after an abrupt termination. 896 * </p> 897 * 898 * @throws XMPPException if an error occurs while trying to establish the connection. 899 * @throws SmackException 900 * @throws IOException 901 * @throws InterruptedException 902 */ 903 @Override 904 protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException { 905 closingStreamReceived.init(); 906 // Establishes the TCP connection to the server and does setup the reader and writer. Throws an exception if 907 // there is an error establishing the connection 908 connectUsingConfiguration(); 909 910 // We connected successfully to the servers TCP port 911 initConnection(); 912 913 // TLS handled will be successful either if TLS was established, or if it was not mandatory. 914 tlsHandled.checkIfSuccessOrWaitOrThrow(); 915 916 // Wait with SASL auth until the SASL mechanisms have been received 917 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 918 } 919 920 /** 921 * Sends out a notification that there was an error with the connection 922 * and closes the connection. Also prints the stack trace of the given exception 923 * 924 * @param e the exception that causes the connection close event. 925 */ 926 private synchronized void notifyConnectionError(Exception e) { 927 // Listeners were already notified of the exception, return right here. 928 if (packetReader.done && packetWriter.done()) return; 929 930 SmackWrappedException smackWrappedException = new SmackWrappedException(e); 931 tlsHandled.reportGenericFailure(smackWrappedException); 932 saslFeatureReceived.reportGenericFailure(smackWrappedException); 933 maybeCompressFeaturesReceived.reportGenericFailure(smackWrappedException); 934 lastFeaturesReceived.reportGenericFailure(smackWrappedException); 935 936 // Closes the connection temporary. A reconnection is possible 937 // Note that a connection listener of XMPPTCPConnection will drop the SM state in 938 // case the Exception is a StreamErrorException. 939 instantShutdown(); 940 941 // Notify connection listeners of the error. 942 callConnectionClosedOnErrorListener(e); 943 } 944 945 /** 946 * For unit testing purposes 947 * 948 * @param writer 949 */ 950 protected void setWriter(Writer writer) { 951 this.writer = writer; 952 } 953 954 @Override 955 protected void afterFeaturesReceived() throws NotConnectedException, InterruptedException { 956 StartTls startTlsFeature = getFeature(StartTls.ELEMENT, StartTls.NAMESPACE); 957 if (startTlsFeature != null) { 958 if (startTlsFeature.required() && config.getSecurityMode() == SecurityMode.disabled) { 959 SmackException smackException = new SecurityRequiredByServerException(); 960 tlsHandled.reportFailure(smackException); 961 notifyConnectionError(smackException); 962 return; 963 } 964 965 if (config.getSecurityMode() != ConnectionConfiguration.SecurityMode.disabled) { 966 sendNonza(new StartTls()); 967 } else { 968 tlsHandled.reportSuccess(); 969 } 970 } else { 971 tlsHandled.reportSuccess(); 972 } 973 974 if (getSASLAuthentication().authenticationSuccessful()) { 975 // If we have received features after the SASL has been successfully completed, then we 976 // have also *maybe* received, as it is an optional feature, the compression feature 977 // from the server. 978 maybeCompressFeaturesReceived.reportSuccess(); 979 } 980 } 981 982 /** 983 * Resets the parser using the latest connection's reader. Resetting the parser is necessary 984 * when the plain connection has been secured or when a new opening stream element is going 985 * to be sent by the server. 986 * 987 * @throws SmackException if the parser could not be reset. 988 * @throws InterruptedException 989 */ 990 void openStream() throws SmackException, InterruptedException { 991 // If possible, provide the receiving entity of the stream open tag, i.e. the server, as much information as 992 // possible. The 'to' attribute is *always* available. The 'from' attribute if set by the user and no external 993 // mechanism is used to determine the local entity (user). And the 'id' attribute is available after the first 994 // response from the server (see e.g. RFC 6120 ยง 9.1.1 Step 2.) 995 CharSequence to = getXMPPServiceDomain(); 996 CharSequence from = null; 997 CharSequence localpart = config.getUsername(); 998 if (localpart != null) { 999 from = XmppStringUtils.completeJidFrom(localpart, to); 1000 } 1001 String id = getStreamId(); 1002 sendNonza(new StreamOpen(to, from, id)); 1003 try { 1004 packetReader.parser = PacketParserUtils.newXmppParser(reader); 1005 } 1006 catch (XmlPullParserException e) { 1007 throw new SmackException(e); 1008 } 1009 } 1010 1011 protected class PacketReader { 1012 1013 XmlPullParser parser; 1014 1015 private volatile boolean done; 1016 1017 /** 1018 * Initializes the reader in order to be used. The reader is initialized during the 1019 * first connection and when reconnecting due to an abruptly disconnection. 1020 */ 1021 void init() { 1022 done = false; 1023 1024 Async.go(new Runnable() { 1025 @Override 1026 public void run() { 1027 try { 1028 parsePackets(); 1029 } finally { 1030 XMPPTCPConnection.this.readerWriterSemaphore.release(); 1031 } 1032 } 1033 }, "Smack Reader (" + getConnectionCounter() + ")"); 1034 } 1035 1036 /** 1037 * Shuts the stanza reader down. This method simply sets the 'done' flag to true. 1038 */ 1039 void shutdown() { 1040 done = true; 1041 } 1042 1043 /** 1044 * Parse top-level packets in order to process them further. 1045 */ 1046 private void parsePackets() { 1047 try { 1048 initialOpenStreamSend.checkIfSuccessOrWait(); 1049 int eventType = parser.getEventType(); 1050 while (!done) { 1051 switch (eventType) { 1052 case XmlPullParser.START_TAG: 1053 final String name = parser.getName(); 1054 switch (name) { 1055 case Message.ELEMENT: 1056 case IQ.IQ_ELEMENT: 1057 case Presence.ELEMENT: 1058 try { 1059 parseAndProcessStanza(parser); 1060 } finally { 1061 clientHandledStanzasCount = SMUtils.incrementHeight(clientHandledStanzasCount); 1062 } 1063 break; 1064 case "stream": 1065 // We found an opening stream. 1066 if ("jabber:client".equals(parser.getNamespace(null))) { 1067 streamId = parser.getAttributeValue("", "id"); 1068 String reportedServerDomain = parser.getAttributeValue("", "from"); 1069 assert (config.getXMPPServiceDomain().equals(reportedServerDomain)); 1070 } 1071 break; 1072 case "error": 1073 StreamError streamError = PacketParserUtils.parseStreamError(parser); 1074 saslFeatureReceived.reportFailure(new StreamErrorException(streamError)); 1075 // Mark the tlsHandled sync point as success, we will use the saslFeatureReceived sync 1076 // point to report the error, which is checked immediately after tlsHandled in 1077 // connectInternal(). 1078 tlsHandled.reportSuccess(); 1079 throw new StreamErrorException(streamError); 1080 case "features": 1081 parseFeatures(parser); 1082 break; 1083 case "proceed": 1084 try { 1085 // Secure the connection by negotiating TLS 1086 proceedTLSReceived(); 1087 // Send a new opening stream to the server 1088 openStream(); 1089 } 1090 catch (Exception e) { 1091 SmackException smackException = new SmackException(e); 1092 tlsHandled.reportFailure(smackException); 1093 throw e; 1094 } 1095 break; 1096 case "failure": 1097 String namespace = parser.getNamespace(null); 1098 switch (namespace) { 1099 case "urn:ietf:params:xml:ns:xmpp-tls": 1100 // TLS negotiation has failed. The server will close the connection 1101 // TODO Parse failure stanza 1102 throw new SmackException("TLS negotiation has failed"); 1103 case "http://jabber.org/protocol/compress": 1104 // Stream compression has been denied. This is a recoverable 1105 // situation. It is still possible to authenticate and 1106 // use the connection but using an uncompressed connection 1107 // TODO Parse failure stanza 1108 compressSyncPoint.reportFailure(new SmackException( 1109 "Could not establish compression")); 1110 break; 1111 case SaslStreamElements.NAMESPACE: 1112 // SASL authentication has failed. The server may close the connection 1113 // depending on the number of retries 1114 final SASLFailure failure = PacketParserUtils.parseSASLFailure(parser); 1115 getSASLAuthentication().authenticationFailed(failure); 1116 break; 1117 } 1118 break; 1119 case Challenge.ELEMENT: 1120 // The server is challenging the SASL authentication made by the client 1121 String challengeData = parser.nextText(); 1122 getSASLAuthentication().challengeReceived(challengeData); 1123 break; 1124 case Success.ELEMENT: 1125 Success success = new Success(parser.nextText()); 1126 // We now need to bind a resource for the connection 1127 // Open a new stream and wait for the response 1128 openStream(); 1129 // The SASL authentication with the server was successful. The next step 1130 // will be to bind the resource 1131 getSASLAuthentication().authenticated(success); 1132 break; 1133 case Compressed.ELEMENT: 1134 // Server confirmed that it's possible to use stream compression. Start 1135 // stream compression 1136 // Initialize the reader and writer with the new compressed version 1137 initReaderAndWriter(); 1138 // Send a new opening stream to the server 1139 openStream(); 1140 // Notify that compression is being used 1141 compressSyncPoint.reportSuccess(); 1142 break; 1143 case Enabled.ELEMENT: 1144 Enabled enabled = ParseStreamManagement.enabled(parser); 1145 if (enabled.isResumeSet()) { 1146 smSessionId = enabled.getId(); 1147 if (StringUtils.isNullOrEmpty(smSessionId)) { 1148 SmackException xmppException = new SmackException("Stream Management 'enabled' element with resume attribute but without session id received"); 1149 smEnabledSyncPoint.reportFailure(xmppException); 1150 throw xmppException; 1151 } 1152 smServerMaxResumptionTime = enabled.getMaxResumptionTime(); 1153 } else { 1154 // Mark this a non-resumable stream by setting smSessionId to null 1155 smSessionId = null; 1156 } 1157 clientHandledStanzasCount = 0; 1158 smWasEnabledAtLeastOnce = true; 1159 smEnabledSyncPoint.reportSuccess(); 1160 LOGGER.fine("Stream Management (XEP-198): successfully enabled"); 1161 break; 1162 case Failed.ELEMENT: 1163 Failed failed = ParseStreamManagement.failed(parser); 1164 FailedNonzaException xmppException = new FailedNonzaException(failed, failed.getStanzaErrorCondition()); 1165 // If only XEP-198 would specify different failure elements for the SM 1166 // enable and SM resume failure case. But this is not the case, so we 1167 // need to determine if this is a 'Failed' response for either 'Enable' 1168 // or 'Resume'. 1169 if (smResumedSyncPoint.requestSent()) { 1170 smResumedSyncPoint.reportFailure(xmppException); 1171 } 1172 else { 1173 if (!smEnabledSyncPoint.requestSent()) { 1174 throw new IllegalStateException("Failed element received but SM was not previously enabled"); 1175 } 1176 smEnabledSyncPoint.reportFailure(new SmackException(xmppException)); 1177 // Report success for last lastFeaturesReceived so that in case a 1178 // failed resumption, we can continue with normal resource binding. 1179 // See text of XEP-198 5. below Example 11. 1180 lastFeaturesReceived.reportSuccess(); 1181 } 1182 break; 1183 case Resumed.ELEMENT: 1184 Resumed resumed = ParseStreamManagement.resumed(parser); 1185 if (!smSessionId.equals(resumed.getPrevId())) { 1186 throw new StreamIdDoesNotMatchException(smSessionId, resumed.getPrevId()); 1187 } 1188 // Mark SM as enabled 1189 smEnabledSyncPoint.reportSuccess(); 1190 // First, drop the stanzas already handled by the server 1191 processHandledCount(resumed.getHandledCount()); 1192 // Then re-send what is left in the unacknowledged queue 1193 List<Stanza> stanzasToResend = new ArrayList<>(unacknowledgedStanzas.size()); 1194 unacknowledgedStanzas.drainTo(stanzasToResend); 1195 for (Stanza stanza : stanzasToResend) { 1196 sendStanzaInternal(stanza); 1197 } 1198 // If there where stanzas resent, then request a SM ack for them. 1199 // Writer's sendStreamElement() won't do it automatically based on 1200 // predicates. 1201 if (!stanzasToResend.isEmpty()) { 1202 requestSmAcknowledgementInternal(); 1203 } 1204 // Mark SM resumption as successful 1205 smResumedSyncPoint.reportSuccess(); 1206 LOGGER.fine("Stream Management (XEP-198): Stream resumed"); 1207 break; 1208 case AckAnswer.ELEMENT: 1209 AckAnswer ackAnswer = ParseStreamManagement.ackAnswer(parser); 1210 processHandledCount(ackAnswer.getHandledCount()); 1211 break; 1212 case AckRequest.ELEMENT: 1213 ParseStreamManagement.ackRequest(parser); 1214 if (smEnabledSyncPoint.wasSuccessful()) { 1215 sendSmAcknowledgementInternal(); 1216 } else { 1217 LOGGER.warning("SM Ack Request received while SM is not enabled"); 1218 } 1219 break; 1220 default: 1221 LOGGER.warning("Unknown top level stream element: " + name); 1222 break; 1223 } 1224 break; 1225 case XmlPullParser.END_TAG: 1226 final String endTagName = parser.getName(); 1227 if ("stream".equals(endTagName)) { 1228 if (!parser.getNamespace().equals("http://etherx.jabber.org/streams")) { 1229 LOGGER.warning(XMPPTCPConnection.this + " </stream> but different namespace " + parser.getNamespace()); 1230 break; 1231 } 1232 1233 // Check if the queue was already shut down before reporting success on closing stream tag 1234 // received. This avoids a race if there is a disconnect(), followed by a connect(), which 1235 // did re-start the queue again, causing this writer to assume that the queue is not 1236 // shutdown, which results in a call to disconnect(). 1237 final boolean queueWasShutdown = packetWriter.queue.isShutdown(); 1238 closingStreamReceived.reportSuccess(); 1239 1240 if (queueWasShutdown) { 1241 // We received a closing stream element *after* we initiated the 1242 // termination of the session by sending a closing stream element to 1243 // the server first 1244 return; 1245 } else { 1246 // We received a closing stream element from the server without us 1247 // sending a closing stream element first. This means that the 1248 // server wants to terminate the session, therefore disconnect 1249 // the connection 1250 LOGGER.info(XMPPTCPConnection.this 1251 + " received closing </stream> element." 1252 + " Server wants to terminate the connection, calling disconnect()"); 1253 disconnect(); 1254 } 1255 } 1256 break; 1257 case XmlPullParser.END_DOCUMENT: 1258 // END_DOCUMENT only happens in an error case, as otherwise we would see a 1259 // closing stream element before. 1260 throw new SmackException( 1261 "Parser got END_DOCUMENT event. This could happen e.g. if the server closed the connection without sending a closing stream element"); 1262 } 1263 eventType = parser.next(); 1264 } 1265 } 1266 catch (Exception e) { 1267 closingStreamReceived.reportFailure(e); 1268 // The exception can be ignored if the the connection is 'done' 1269 // or if the it was caused because the socket got closed 1270 if (!(done || packetWriter.queue.isShutdown())) { 1271 // Close the connection and notify connection listeners of the 1272 // error. 1273 notifyConnectionError(e); 1274 } 1275 } 1276 } 1277 } 1278 1279 protected class PacketWriter { 1280 public static final int QUEUE_SIZE = XMPPTCPConnection.QUEUE_SIZE; 1281 1282 private final ArrayBlockingQueueWithShutdown<Element> queue = new ArrayBlockingQueueWithShutdown<>( 1283 QUEUE_SIZE, true); 1284 1285 /** 1286 * Needs to be protected for unit testing purposes. 1287 */ 1288 protected SynchronizationPoint<NoResponseException> shutdownDone = new SynchronizationPoint<>( 1289 XMPPTCPConnection.this, "shutdown completed"); 1290 1291 /** 1292 * If set, the stanza writer is shut down 1293 */ 1294 protected volatile Long shutdownTimestamp = null; 1295 1296 private volatile boolean instantShutdown; 1297 1298 /** 1299 * True if some preconditions are given to start the bundle and defer mechanism. 1300 * <p> 1301 * This will likely get set to true right after the start of the writer thread, because 1302 * {@link #nextStreamElement()} will check if {@link queue} is empty, which is probably the case, and then set 1303 * this field to true. 1304 * </p> 1305 */ 1306 private boolean shouldBundleAndDefer; 1307 1308 /** 1309 * Initializes the writer in order to be used. It is called at the first connection and also 1310 * is invoked if the connection is disconnected by an error. 1311 */ 1312 void init() { 1313 shutdownDone.init(); 1314 shutdownTimestamp = null; 1315 1316 if (unacknowledgedStanzas != null) { 1317 // It's possible that there are new stanzas in the writer queue that 1318 // came in while we were disconnected but resumable, drain those into 1319 // the unacknowledged queue so that they get resent now 1320 drainWriterQueueToUnacknowledgedStanzas(); 1321 } 1322 1323 queue.start(); 1324 Async.go(new Runnable() { 1325 @Override 1326 public void run() { 1327 try { 1328 writePackets(); 1329 } finally { 1330 XMPPTCPConnection.this.readerWriterSemaphore.release(); 1331 } 1332 } 1333 }, "Smack Writer (" + getConnectionCounter() + ")"); 1334 } 1335 1336 private boolean done() { 1337 return shutdownTimestamp != null; 1338 } 1339 1340 protected void throwNotConnectedExceptionIfDoneAndResumptionNotPossible() throws NotConnectedException { 1341 final boolean done = done(); 1342 if (done) { 1343 final boolean smResumptionPossible = isSmResumptionPossible(); 1344 // Don't throw a NotConnectedException is there is an resumable stream available 1345 if (!smResumptionPossible) { 1346 throw new NotConnectedException(XMPPTCPConnection.this, "done=" + done 1347 + " smResumptionPossible=" + smResumptionPossible); 1348 } 1349 } 1350 } 1351 1352 /** 1353 * Sends the specified element to the server. 1354 * 1355 * @param element the element to send. 1356 * @throws NotConnectedException 1357 * @throws InterruptedException 1358 */ 1359 protected void sendStreamElement(Element element) throws NotConnectedException, InterruptedException { 1360 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1361 try { 1362 queue.put(element); 1363 } 1364 catch (InterruptedException e) { 1365 // put() may throw an InterruptedException for two reasons: 1366 // 1. If the queue was shut down 1367 // 2. If the thread was interrupted 1368 // so we have to check which is the case 1369 throwNotConnectedExceptionIfDoneAndResumptionNotPossible(); 1370 // If the method above did not throw, then the sending thread was interrupted 1371 throw e; 1372 } 1373 } 1374 1375 /** 1376 * Shuts down the stanza writer. Once this method has been called, no further 1377 * packets will be written to the server. 1378 * @throws InterruptedException 1379 */ 1380 void shutdown(boolean instant) { 1381 instantShutdown = instant; 1382 queue.shutdown(); 1383 shutdownTimestamp = System.currentTimeMillis(); 1384 if (shutdownDone.isNotInInitialState()) { 1385 try { 1386 shutdownDone.checkIfSuccessOrWait(); 1387 } catch (NoResponseException | InterruptedException e) { 1388 LOGGER.log(Level.WARNING, "shutdownDone was not marked as successful by the writer thread", e); 1389 } 1390 } 1391 } 1392 1393 /** 1394 * Maybe return the next available element from the queue for writing. If the queue is shut down <b>or</b> a 1395 * spurious interrupt occurs, <code>null</code> is returned. So it is important to check the 'done' condition in 1396 * that case. 1397 * 1398 * @return the next element for writing or null. 1399 */ 1400 private Element nextStreamElement() { 1401 // It is important the we check if the queue is empty before removing an element from it 1402 if (queue.isEmpty()) { 1403 shouldBundleAndDefer = true; 1404 } 1405 Element packet = null; 1406 try { 1407 packet = queue.take(); 1408 } 1409 catch (InterruptedException e) { 1410 if (!queue.isShutdown()) { 1411 // Users shouldn't try to interrupt the packet writer thread 1412 LOGGER.log(Level.WARNING, "Writer thread was interrupted. Don't do that. Use disconnect() instead.", e); 1413 } 1414 } 1415 return packet; 1416 } 1417 1418 private void writePackets() { 1419 Exception writerException = null; 1420 try { 1421 openStream(); 1422 initialOpenStreamSend.reportSuccess(); 1423 // Write out packets from the queue. 1424 while (!done()) { 1425 Element element = nextStreamElement(); 1426 if (element == null) { 1427 continue; 1428 } 1429 1430 // Get a local version of the bundle and defer callback, in case it's unset 1431 // between the null check and the method invocation 1432 final BundleAndDeferCallback localBundleAndDeferCallback = bundleAndDeferCallback; 1433 // If the preconditions are given (e.g. bundleAndDefer callback is set, queue is 1434 // empty), then we could wait a bit for further stanzas attempting to decrease 1435 // our energy consumption 1436 if (localBundleAndDeferCallback != null && isAuthenticated() && shouldBundleAndDefer) { 1437 // Reset shouldBundleAndDefer to false, nextStreamElement() will set it to true once the 1438 // queue is empty again. 1439 shouldBundleAndDefer = false; 1440 final AtomicBoolean bundlingAndDeferringStopped = new AtomicBoolean(); 1441 final int bundleAndDeferMillis = localBundleAndDeferCallback.getBundleAndDeferMillis(new BundleAndDefer( 1442 bundlingAndDeferringStopped)); 1443 if (bundleAndDeferMillis > 0) { 1444 long remainingWait = bundleAndDeferMillis; 1445 final long waitStart = System.currentTimeMillis(); 1446 synchronized (bundlingAndDeferringStopped) { 1447 while (!bundlingAndDeferringStopped.get() && remainingWait > 0) { 1448 bundlingAndDeferringStopped.wait(remainingWait); 1449 remainingWait = bundleAndDeferMillis 1450 - (System.currentTimeMillis() - waitStart); 1451 } 1452 } 1453 } 1454 } 1455 1456 Stanza packet = null; 1457 if (element instanceof Stanza) { 1458 packet = (Stanza) element; 1459 } 1460 else if (element instanceof Enable) { 1461 // The client needs to add messages to the unacknowledged stanzas queue 1462 // right after it sent 'enabled'. Stanza will be added once 1463 // unacknowledgedStanzas is not null. 1464 unacknowledgedStanzas = new ArrayBlockingQueue<>(QUEUE_SIZE); 1465 } 1466 maybeAddToUnacknowledgedStanzas(packet); 1467 1468 CharSequence elementXml = element.toXML(StreamOpen.CLIENT_NAMESPACE); 1469 if (elementXml instanceof XmlStringBuilder) { 1470 ((XmlStringBuilder) elementXml).write(writer, StreamOpen.CLIENT_NAMESPACE); 1471 } 1472 else { 1473 writer.write(elementXml.toString()); 1474 } 1475 1476 if (queue.isEmpty()) { 1477 writer.flush(); 1478 } 1479 if (packet != null) { 1480 firePacketSendingListeners(packet); 1481 } 1482 } 1483 if (!instantShutdown) { 1484 // Flush out the rest of the queue. 1485 try { 1486 while (!queue.isEmpty()) { 1487 Element packet = queue.remove(); 1488 if (packet instanceof Stanza) { 1489 Stanza stanza = (Stanza) packet; 1490 maybeAddToUnacknowledgedStanzas(stanza); 1491 } 1492 writer.write(packet.toXML(null).toString()); 1493 } 1494 writer.flush(); 1495 } 1496 catch (Exception e) { 1497 LOGGER.log(Level.WARNING, 1498 "Exception flushing queue during shutdown, ignore and continue", 1499 e); 1500 } 1501 1502 // Close the stream. 1503 try { 1504 writer.write("</stream:stream>"); 1505 writer.flush(); 1506 } 1507 catch (Exception e) { 1508 LOGGER.log(Level.WARNING, "Exception writing closing stream element", e); 1509 } 1510 1511 // Delete the queue contents (hopefully nothing is left). 1512 queue.clear(); 1513 } else if (instantShutdown && isSmEnabled()) { 1514 // This was an instantShutdown and SM is enabled, drain all remaining stanzas 1515 // into the unacknowledgedStanzas queue 1516 drainWriterQueueToUnacknowledgedStanzas(); 1517 } 1518 // Do *not* close the writer here, as it will cause the socket 1519 // to get closed. But we may want to receive further stanzas 1520 // until the closing stream tag is received. The socket will be 1521 // closed in shutdown(). 1522 } 1523 catch (Exception e) { 1524 // The exception can be ignored if the the connection is 'done' 1525 // or if the it was caused because the socket got closed 1526 if (!(done() || queue.isShutdown())) { 1527 writerException = e; 1528 } else { 1529 LOGGER.log(Level.FINE, "Ignoring Exception in writePackets()", e); 1530 } 1531 } finally { 1532 LOGGER.fine("Reporting shutdownDone success in writer thread"); 1533 shutdownDone.reportSuccess(); 1534 } 1535 // Delay notifyConnectionError after shutdownDone has been reported in the finally block. 1536 if (writerException != null) { 1537 notifyConnectionError(writerException); 1538 } 1539 } 1540 1541 private void drainWriterQueueToUnacknowledgedStanzas() { 1542 List<Element> elements = new ArrayList<>(queue.size()); 1543 queue.drainTo(elements); 1544 for (int i = 0; i < elements.size(); i++) { 1545 Element element = elements.get(i); 1546 // If the unacknowledgedStanza queue is full, then bail out with a warning message. See SMACK-844. 1547 if (unacknowledgedStanzas.remainingCapacity() == 0) { 1548 StreamManagementException.UnacknowledgedQueueFullException exception = StreamManagementException.UnacknowledgedQueueFullException 1549 .newWith(i, elements, unacknowledgedStanzas); 1550 LOGGER.log(Level.WARNING, 1551 "Some stanzas may be lost as not all could be drained to the unacknowledged stanzas queue", exception); 1552 return; 1553 } 1554 if (element instanceof Stanza) { 1555 unacknowledgedStanzas.add((Stanza) element); 1556 } 1557 } 1558 } 1559 1560 private void maybeAddToUnacknowledgedStanzas(Stanza stanza) throws IOException { 1561 // Check if the stream element should be put to the unacknowledgedStanza 1562 // queue. Note that we can not do the put() in sendStanzaInternal() and the 1563 // packet order is not stable at this point (sendStanzaInternal() can be 1564 // called concurrently). 1565 if (unacknowledgedStanzas != null && stanza != null) { 1566 // If the unacknowledgedStanza queue is nearly full, request an new ack 1567 // from the server in order to drain it 1568 if (unacknowledgedStanzas.size() == 0.8 * XMPPTCPConnection.QUEUE_SIZE) { 1569 writer.write(AckRequest.INSTANCE.toXML(null).toString()); 1570 writer.flush(); 1571 } 1572 try { 1573 // It is important the we put the stanza in the unacknowledged stanza 1574 // queue before we put it on the wire 1575 unacknowledgedStanzas.put(stanza); 1576 } 1577 catch (InterruptedException e) { 1578 throw new IllegalStateException(e); 1579 } 1580 } 1581 } 1582 } 1583 1584 /** 1585 * Set if Stream Management should be used by default for new connections. 1586 * 1587 * @param useSmDefault true to use Stream Management for new connections. 1588 */ 1589 public static void setUseStreamManagementDefault(boolean useSmDefault) { 1590 XMPPTCPConnection.useSmDefault = useSmDefault; 1591 } 1592 1593 /** 1594 * Set if Stream Management resumption should be used by default for new connections. 1595 * 1596 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1597 * @deprecated use {@link #setUseStreamManagementResumptionDefault(boolean)} instead. 1598 */ 1599 @Deprecated 1600 public static void setUseStreamManagementResumptiodDefault(boolean useSmResumptionDefault) { 1601 setUseStreamManagementResumptionDefault(useSmResumptionDefault); 1602 } 1603 1604 /** 1605 * Set if Stream Management resumption should be used by default for new connections. 1606 * 1607 * @param useSmResumptionDefault true to use Stream Management resumption for new connections. 1608 */ 1609 public static void setUseStreamManagementResumptionDefault(boolean useSmResumptionDefault) { 1610 if (useSmResumptionDefault) { 1611 // Also enable SM is resumption is enabled 1612 setUseStreamManagementDefault(useSmResumptionDefault); 1613 } 1614 XMPPTCPConnection.useSmResumptionDefault = useSmResumptionDefault; 1615 } 1616 1617 /** 1618 * Set if Stream Management should be used if supported by the server. 1619 * 1620 * @param useSm true to use Stream Management. 1621 */ 1622 public void setUseStreamManagement(boolean useSm) { 1623 this.useSm = useSm; 1624 } 1625 1626 /** 1627 * Set if Stream Management resumption should be used if supported by the server. 1628 * 1629 * @param useSmResumption true to use Stream Management resumption. 1630 */ 1631 public void setUseStreamManagementResumption(boolean useSmResumption) { 1632 if (useSmResumption) { 1633 // Also enable SM is resumption is enabled 1634 setUseStreamManagement(useSmResumption); 1635 } 1636 this.useSmResumption = useSmResumption; 1637 } 1638 1639 /** 1640 * Set the preferred resumption time in seconds. 1641 * @param resumptionTime the preferred resumption time in seconds 1642 */ 1643 public void setPreferredResumptionTime(int resumptionTime) { 1644 smClientMaxResumptionTime = resumptionTime; 1645 } 1646 1647 /** 1648 * Add a predicate for Stream Management acknowledgment requests. 1649 * <p> 1650 * Those predicates are used to determine when a Stream Management acknowledgement request is send to the server. 1651 * Some pre-defined predicates are found in the <code>org.jivesoftware.smack.sm.predicates</code> package. 1652 * </p> 1653 * <p> 1654 * If not predicate is configured, the {@link Predicate#forMessagesOrAfter5Stanzas()} will be used. 1655 * </p> 1656 * 1657 * @param predicate the predicate to add. 1658 * @return if the predicate was not already active. 1659 */ 1660 public boolean addRequestAckPredicate(StanzaFilter predicate) { 1661 synchronized (requestAckPredicates) { 1662 return requestAckPredicates.add(predicate); 1663 } 1664 } 1665 1666 /** 1667 * Remove the given predicate for Stream Management acknowledgment request. 1668 * @param predicate the predicate to remove. 1669 * @return true if the predicate was removed. 1670 */ 1671 public boolean removeRequestAckPredicate(StanzaFilter predicate) { 1672 synchronized (requestAckPredicates) { 1673 return requestAckPredicates.remove(predicate); 1674 } 1675 } 1676 1677 /** 1678 * Remove all predicates for Stream Management acknowledgment requests. 1679 */ 1680 public void removeAllRequestAckPredicates() { 1681 synchronized (requestAckPredicates) { 1682 requestAckPredicates.clear(); 1683 } 1684 } 1685 1686 /** 1687 * Send an unconditional Stream Management acknowledgement request to the server. 1688 * 1689 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1690 * @throws NotConnectedException if the connection is not connected. 1691 * @throws InterruptedException 1692 */ 1693 public void requestSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1694 if (!isSmEnabled()) { 1695 throw new StreamManagementException.StreamManagementNotEnabledException(); 1696 } 1697 requestSmAcknowledgementInternal(); 1698 } 1699 1700 private void requestSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1701 packetWriter.sendStreamElement(AckRequest.INSTANCE); 1702 } 1703 1704 /** 1705 * Send a unconditional Stream Management acknowledgment to the server. 1706 * <p> 1707 * See <a href="http://xmpp.org/extensions/xep-0198.html#acking">XEP-198: Stream Management ยง 4. Acks</a>: 1708 * "Either party MAY send an <a/> element at any time (e.g., after it has received a certain number of stanzas, 1709 * or after a certain period of time), even if it has not received an <r/> element from the other party." 1710 * </p> 1711 * 1712 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1713 * @throws NotConnectedException if the connection is not connected. 1714 * @throws InterruptedException 1715 */ 1716 public void sendSmAcknowledgement() throws StreamManagementNotEnabledException, NotConnectedException, InterruptedException { 1717 if (!isSmEnabled()) { 1718 throw new StreamManagementException.StreamManagementNotEnabledException(); 1719 } 1720 sendSmAcknowledgementInternal(); 1721 } 1722 1723 private void sendSmAcknowledgementInternal() throws NotConnectedException, InterruptedException { 1724 packetWriter.sendStreamElement(new AckAnswer(clientHandledStanzasCount)); 1725 } 1726 1727 /** 1728 * Add a Stanza acknowledged listener. 1729 * <p> 1730 * Those listeners will be invoked every time a Stanza has been acknowledged by the server. The will not get 1731 * automatically removed. Consider using {@link #addStanzaIdAcknowledgedListener(String, StanzaListener)} when 1732 * possible. 1733 * </p> 1734 * 1735 * @param listener the listener to add. 1736 */ 1737 public void addStanzaAcknowledgedListener(StanzaListener listener) { 1738 stanzaAcknowledgedListeners.add(listener); 1739 } 1740 1741 /** 1742 * Remove the given Stanza acknowledged listener. 1743 * 1744 * @param listener the listener. 1745 * @return true if the listener was removed. 1746 */ 1747 public boolean removeStanzaAcknowledgedListener(StanzaListener listener) { 1748 return stanzaAcknowledgedListeners.remove(listener); 1749 } 1750 1751 /** 1752 * Remove all stanza acknowledged listeners. 1753 */ 1754 public void removeAllStanzaAcknowledgedListeners() { 1755 stanzaAcknowledgedListeners.clear(); 1756 } 1757 1758 /** 1759 * Add a new Stanza ID acknowledged listener for the given ID. 1760 * <p> 1761 * The listener will be invoked if the stanza with the given ID was acknowledged by the server. It will 1762 * automatically be removed after the listener was run. 1763 * </p> 1764 * 1765 * @param id the stanza ID. 1766 * @param listener the listener to invoke. 1767 * @return the previous listener for this stanza ID or null. 1768 * @throws StreamManagementNotEnabledException if Stream Management is not enabled. 1769 */ 1770 @SuppressWarnings("FutureReturnValueIgnored") 1771 public StanzaListener addStanzaIdAcknowledgedListener(final String id, StanzaListener listener) throws StreamManagementNotEnabledException { 1772 // Prevent users from adding callbacks that will never get removed 1773 if (!smWasEnabledAtLeastOnce) { 1774 throw new StreamManagementException.StreamManagementNotEnabledException(); 1775 } 1776 // Remove the listener after max. 3 hours 1777 final int removeAfterSeconds = Math.min(getMaxSmResumptionTime(), 3 * 60 * 60); 1778 schedule(new Runnable() { 1779 @Override 1780 public void run() { 1781 stanzaIdAcknowledgedListeners.remove(id); 1782 } 1783 }, removeAfterSeconds, TimeUnit.SECONDS); 1784 return stanzaIdAcknowledgedListeners.put(id, listener); 1785 } 1786 1787 /** 1788 * Remove the Stanza ID acknowledged listener for the given ID. 1789 * 1790 * @param id the stanza ID. 1791 * @return true if the listener was found and removed, false otherwise. 1792 */ 1793 public StanzaListener removeStanzaIdAcknowledgedListener(String id) { 1794 return stanzaIdAcknowledgedListeners.remove(id); 1795 } 1796 1797 /** 1798 * Removes all Stanza ID acknowledged listeners. 1799 */ 1800 public void removeAllStanzaIdAcknowledgedListeners() { 1801 stanzaIdAcknowledgedListeners.clear(); 1802 } 1803 1804 /** 1805 * Returns true if Stream Management is supported by the server. 1806 * 1807 * @return true if Stream Management is supported by the server. 1808 */ 1809 public boolean isSmAvailable() { 1810 return hasFeature(StreamManagementFeature.ELEMENT, StreamManagement.NAMESPACE); 1811 } 1812 1813 /** 1814 * Returns true if Stream Management was successfully negotiated with the server. 1815 * 1816 * @return true if Stream Management was negotiated. 1817 */ 1818 public boolean isSmEnabled() { 1819 return smEnabledSyncPoint.wasSuccessful(); 1820 } 1821 1822 /** 1823 * Returns true if the stream was successfully resumed with help of Stream Management. 1824 * 1825 * @return true if the stream was resumed. 1826 */ 1827 public boolean streamWasResumed() { 1828 return smResumedSyncPoint.wasSuccessful(); 1829 } 1830 1831 /** 1832 * Returns true if the connection is disconnected by a Stream resumption via Stream Management is possible. 1833 * 1834 * @return true if disconnected but resumption possible. 1835 */ 1836 public boolean isDisconnectedButSmResumptionPossible() { 1837 return disconnectedButResumeable && isSmResumptionPossible(); 1838 } 1839 1840 /** 1841 * Returns true if the stream is resumable. 1842 * 1843 * @return true if the stream is resumable. 1844 */ 1845 public boolean isSmResumptionPossible() { 1846 // There is no resumable stream available 1847 if (smSessionId == null) 1848 return false; 1849 1850 final Long shutdownTimestamp = packetWriter.shutdownTimestamp; 1851 // Seems like we are already reconnected, report true 1852 if (shutdownTimestamp == null) { 1853 return true; 1854 } 1855 1856 // See if resumption time is over 1857 long current = System.currentTimeMillis(); 1858 long maxResumptionMillies = ((long) getMaxSmResumptionTime()) * 1000; 1859 if (current > shutdownTimestamp + maxResumptionMillies) { 1860 // Stream resumption is *not* possible if the current timestamp is greater then the greatest timestamp where 1861 // resumption is possible 1862 return false; 1863 } else { 1864 return true; 1865 } 1866 } 1867 1868 /** 1869 * Drop the stream management state. Sets {@link #smSessionId} and 1870 * {@link #unacknowledgedStanzas} to <code>null</code>. 1871 */ 1872 private void dropSmState() { 1873 // clientHandledCount and serverHandledCount will be reset on <enable/> and <enabled/> 1874 // respective. No need to reset them here. 1875 smSessionId = null; 1876 unacknowledgedStanzas = null; 1877 } 1878 1879 /** 1880 * Get the maximum resumption time in seconds after which a managed stream can be resumed. 1881 * <p> 1882 * This method will return {@link Integer#MAX_VALUE} if neither the client nor the server specify a maximum 1883 * resumption time. Be aware of integer overflows when using this value, e.g. do not add arbitrary values to it 1884 * without checking for overflows before. 1885 * </p> 1886 * 1887 * @return the maximum resumption time in seconds or {@link Integer#MAX_VALUE} if none set. 1888 */ 1889 public int getMaxSmResumptionTime() { 1890 int clientResumptionTime = smClientMaxResumptionTime > 0 ? smClientMaxResumptionTime : Integer.MAX_VALUE; 1891 int serverResumptionTime = smServerMaxResumptionTime > 0 ? smServerMaxResumptionTime : Integer.MAX_VALUE; 1892 return Math.min(clientResumptionTime, serverResumptionTime); 1893 } 1894 1895 private void processHandledCount(long handledCount) throws StreamManagementCounterError { 1896 long ackedStanzasCount = SMUtils.calculateDelta(handledCount, serverHandledStanzasCount); 1897 final List<Stanza> ackedStanzas = new ArrayList<>( 1898 ackedStanzasCount <= Integer.MAX_VALUE ? (int) ackedStanzasCount 1899 : Integer.MAX_VALUE); 1900 for (long i = 0; i < ackedStanzasCount; i++) { 1901 Stanza ackedStanza = unacknowledgedStanzas.poll(); 1902 // If the server ack'ed a stanza, then it must be in the 1903 // unacknowledged stanza queue. There can be no exception. 1904 if (ackedStanza == null) { 1905 throw new StreamManagementCounterError(handledCount, serverHandledStanzasCount, 1906 ackedStanzasCount, ackedStanzas); 1907 } 1908 ackedStanzas.add(ackedStanza); 1909 } 1910 1911 boolean atLeastOneStanzaAcknowledgedListener = false; 1912 if (!stanzaAcknowledgedListeners.isEmpty()) { 1913 // If stanzaAcknowledgedListeners is not empty, the we have at least one 1914 atLeastOneStanzaAcknowledgedListener = true; 1915 } 1916 else { 1917 // Otherwise we look for a matching id in the stanza *id* acknowledged listeners 1918 for (Stanza ackedStanza : ackedStanzas) { 1919 String id = ackedStanza.getStanzaId(); 1920 if (id != null && stanzaIdAcknowledgedListeners.containsKey(id)) { 1921 atLeastOneStanzaAcknowledgedListener = true; 1922 break; 1923 } 1924 } 1925 } 1926 1927 // Only spawn a new thread if there is a chance that some listener is invoked 1928 if (atLeastOneStanzaAcknowledgedListener) { 1929 asyncGo(new Runnable() { 1930 @Override 1931 public void run() { 1932 for (Stanza ackedStanza : ackedStanzas) { 1933 for (StanzaListener listener : stanzaAcknowledgedListeners) { 1934 try { 1935 listener.processStanza(ackedStanza); 1936 } 1937 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1938 LOGGER.log(Level.FINER, "Received exception", e); 1939 } 1940 } 1941 String id = ackedStanza.getStanzaId(); 1942 if (StringUtils.isNullOrEmpty(id)) { 1943 continue; 1944 } 1945 StanzaListener listener = stanzaIdAcknowledgedListeners.remove(id); 1946 if (listener != null) { 1947 try { 1948 listener.processStanza(ackedStanza); 1949 } 1950 catch (InterruptedException | NotConnectedException | NotLoggedInException e) { 1951 LOGGER.log(Level.FINER, "Received exception", e); 1952 } 1953 } 1954 } 1955 } 1956 }); 1957 } 1958 1959 serverHandledStanzasCount = handledCount; 1960 } 1961 1962 /** 1963 * Set the default bundle and defer callback used for new connections. 1964 * 1965 * @param defaultBundleAndDeferCallback 1966 * @see BundleAndDeferCallback 1967 * @since 4.1 1968 */ 1969 public static void setDefaultBundleAndDeferCallback(BundleAndDeferCallback defaultBundleAndDeferCallback) { 1970 XMPPTCPConnection.defaultBundleAndDeferCallback = defaultBundleAndDeferCallback; 1971 } 1972 1973 /** 1974 * Set the bundle and defer callback used for this connection. 1975 * <p> 1976 * You can use <code>null</code> as argument to reset the callback. Outgoing stanzas will then 1977 * no longer get deferred. 1978 * </p> 1979 * 1980 * @param bundleAndDeferCallback the callback or <code>null</code>. 1981 * @see BundleAndDeferCallback 1982 * @since 4.1 1983 */ 1984 public void setBundleandDeferCallback(BundleAndDeferCallback bundleAndDeferCallback) { 1985 this.bundleAndDeferCallback = bundleAndDeferCallback; 1986 } 1987 1988}