001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.io.Reader; 021import java.io.Writer; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.LinkedHashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ConcurrentLinkedQueue; 031import java.util.concurrent.CopyOnWriteArraySet; 032import java.util.concurrent.ExecutorService; 033import java.util.concurrent.Executors; 034import java.util.concurrent.ScheduledExecutorService; 035import java.util.concurrent.ScheduledFuture; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.atomic.AtomicInteger; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReentrantLock; 040import java.util.logging.Level; 041import java.util.logging.Logger; 042 043import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 044import org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode; 045import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 046import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 047import org.jivesoftware.smack.SmackException.NoResponseException; 048import org.jivesoftware.smack.SmackException.NotConnectedException; 049import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; 050import org.jivesoftware.smack.SmackException.SecurityRequiredByClientException; 051import org.jivesoftware.smack.SmackException.SecurityRequiredException; 052import org.jivesoftware.smack.XMPPException.StreamErrorException; 053import org.jivesoftware.smack.XMPPException.XMPPErrorException; 054import org.jivesoftware.smack.compress.packet.Compress; 055import org.jivesoftware.smack.compression.XMPPInputOutputStream; 056import org.jivesoftware.smack.debugger.SmackDebugger; 057import org.jivesoftware.smack.filter.IQReplyFilter; 058import org.jivesoftware.smack.filter.StanzaFilter; 059import org.jivesoftware.smack.filter.StanzaIdFilter; 060import org.jivesoftware.smack.iqrequest.IQRequestHandler; 061import org.jivesoftware.smack.packet.Bind; 062import org.jivesoftware.smack.packet.ErrorIQ; 063import org.jivesoftware.smack.packet.ExtensionElement; 064import org.jivesoftware.smack.packet.IQ; 065import org.jivesoftware.smack.packet.Mechanisms; 066import org.jivesoftware.smack.packet.Message; 067import org.jivesoftware.smack.packet.Nonza; 068import org.jivesoftware.smack.packet.Presence; 069import org.jivesoftware.smack.packet.Session; 070import org.jivesoftware.smack.packet.Stanza; 071import org.jivesoftware.smack.packet.StartTls; 072import org.jivesoftware.smack.packet.StreamError; 073import org.jivesoftware.smack.packet.XMPPError; 074import org.jivesoftware.smack.parsing.ParsingExceptionCallback; 075import org.jivesoftware.smack.provider.ExtensionElementProvider; 076import org.jivesoftware.smack.provider.ProviderManager; 077import org.jivesoftware.smack.sasl.core.SASLAnonymous; 078import org.jivesoftware.smack.util.BoundedThreadPoolExecutor; 079import org.jivesoftware.smack.util.DNSUtil; 080import org.jivesoftware.smack.util.Objects; 081import org.jivesoftware.smack.util.PacketParserUtils; 082import org.jivesoftware.smack.util.ParserUtils; 083import org.jivesoftware.smack.util.SmackExecutorThreadFactory; 084import org.jivesoftware.smack.util.StringUtils; 085import org.jivesoftware.smack.util.dns.HostAddress; 086 087import org.jxmpp.jid.DomainBareJid; 088import org.jxmpp.jid.EntityFullJid; 089import org.jxmpp.jid.Jid; 090import org.jxmpp.jid.parts.Resourcepart; 091import org.jxmpp.util.XmppStringUtils; 092import org.xmlpull.v1.XmlPullParser; 093 094 095public abstract class AbstractXMPPConnection implements XMPPConnection { 096 private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); 097 098 /** 099 * Counter to uniquely identify connections that are created. 100 */ 101 private final static AtomicInteger connectionCounter = new AtomicInteger(0); 102 103 static { 104 // Ensure the SmackConfiguration class is loaded by calling a method in it. 105 SmackConfiguration.getVersion(); 106 } 107 108 /** 109 * A collection of ConnectionListeners which listen for connection closing 110 * and reconnection events. 111 */ 112 protected final Set<ConnectionListener> connectionListeners = 113 new CopyOnWriteArraySet<ConnectionListener>(); 114 115 /** 116 * A collection of StanzaCollectors which collects packets for a specified filter 117 * and perform blocking and polling operations on the result queue. 118 * <p> 119 * We use a ConcurrentLinkedQueue here, because its Iterator is weakly 120 * consistent and we want {@link #invokeStanzaCollectorsAndNotifyRecvListeners(Stanza)} for-each 121 * loop to be lock free. As drawback, removing a StanzaCollector is O(n). 122 * The alternative would be a synchronized HashSet, but this would mean a 123 * synchronized block around every usage of <code>collectors</code>. 124 * </p> 125 */ 126 private final Collection<StanzaCollector> collectors = new ConcurrentLinkedQueue<>(); 127 128 /** 129 * List of PacketListeners that will be notified synchronously when a new stanza(/packet) was received. 130 */ 131 private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>(); 132 133 /** 134 * List of PacketListeners that will be notified asynchronously when a new stanza(/packet) was received. 135 */ 136 private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>(); 137 138 /** 139 * List of PacketListeners that will be notified when a new stanza(/packet) was sent. 140 */ 141 private final Map<StanzaListener, ListenerWrapper> sendListeners = 142 new HashMap<StanzaListener, ListenerWrapper>(); 143 144 /** 145 * List of PacketListeners that will be notified when a new stanza(/packet) is about to be 146 * sent to the server. These interceptors may modify the stanza(/packet) before it is being 147 * actually sent to the server. 148 */ 149 private final Map<StanzaListener, InterceptorWrapper> interceptors = 150 new HashMap<StanzaListener, InterceptorWrapper>(); 151 152 protected final Lock connectionLock = new ReentrantLock(); 153 154 protected final Map<String, ExtensionElement> streamFeatures = new HashMap<String, ExtensionElement>(); 155 156 /** 157 * The full JID of the authenticated user, as returned by the resource binding response of the server. 158 * <p> 159 * It is important that we don't infer the user from the login() arguments and the configurations service name, as, 160 * for example, when SASL External is used, the username is not given to login but taken from the 'external' 161 * certificate. 162 * </p> 163 */ 164 protected EntityFullJid user; 165 166 protected boolean connected = false; 167 168 /** 169 * The stream ID, see RFC 6120 § 4.7.3 170 */ 171 protected String streamId; 172 173 /** 174 * The timeout to wait for a reply in milliseconds. 175 */ 176 private long replyTimeout = SmackConfiguration.getDefaultReplyTimeout(); 177 178 /** 179 * The SmackDebugger allows to log and debug XML traffic. 180 */ 181 protected SmackDebugger debugger = null; 182 183 /** 184 * The Reader which is used for the debugger. 185 */ 186 protected Reader reader; 187 188 /** 189 * The Writer which is used for the debugger. 190 */ 191 protected Writer writer; 192 193 protected final SynchronizationPoint<SmackException> tlsHandled = new SynchronizationPoint<>(this, "establishing TLS"); 194 195 /** 196 * Set to success if the last features stanza from the server has been parsed. A XMPP connection 197 * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature 198 * stanza is send by the server. This is set to true once the last feature stanza has been 199 * parsed. 200 */ 201 protected final SynchronizationPoint<Exception> lastFeaturesReceived = new SynchronizationPoint<Exception>( 202 AbstractXMPPConnection.this, "last stream features received from server"); 203 204 /** 205 * Set to success if the SASL feature has been received. 206 */ 207 protected final SynchronizationPoint<XMPPException> saslFeatureReceived = new SynchronizationPoint<>( 208 AbstractXMPPConnection.this, "SASL mechanisms stream feature from server"); 209 210 /** 211 * The SASLAuthentication manager that is responsible for authenticating with the server. 212 */ 213 protected final SASLAuthentication saslAuthentication; 214 215 /** 216 * A number to uniquely identify connections that are created. This is distinct from the 217 * connection ID, which is a value sent by the server once a connection is made. 218 */ 219 protected final int connectionCounterValue = connectionCounter.getAndIncrement(); 220 221 /** 222 * Holds the initial configuration used while creating the connection. 223 */ 224 protected final ConnectionConfiguration config; 225 226 /** 227 * Defines how the from attribute of outgoing stanzas should be handled. 228 */ 229 private FromMode fromMode = FromMode.OMITTED; 230 231 protected XMPPInputOutputStream compressionHandler; 232 233 private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); 234 235 /** 236 * ExecutorService used to invoke the PacketListeners on newly arrived and parsed stanzas. It is 237 * important that we use a <b>single threaded ExecutorService</b> in order to guarantee that the 238 * PacketListeners are invoked in the same order the stanzas arrived. 239 */ 240 private final BoundedThreadPoolExecutor executorService = new BoundedThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, 241 100, new SmackExecutorThreadFactory(this, "Incoming Processor")); 242 243 /** 244 * This scheduled thread pool executor is used to remove pending callbacks. 245 */ 246 private final ScheduledExecutorService removeCallbacksService = Executors.newSingleThreadScheduledExecutor( 247 new SmackExecutorThreadFactory(this, "Remove Callbacks")); 248 249 /** 250 * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set 251 * them 'daemon'. 252 */ 253 private final ExecutorService cachedExecutorService = Executors.newCachedThreadPool( 254 // @formatter:off 255 // CHECKSTYLE:OFF 256 new SmackExecutorThreadFactory( // threadFactory 257 this, 258 "Cached Executor" 259 ) 260 // @formatter:on 261 // CHECKSTYLE:ON 262 ); 263 264 /** 265 * A executor service used to invoke the callbacks of synchronous stanza(/packet) listeners. We use a executor service to 266 * decouple incoming stanza processing from callback invocation. It is important that order of callback invocation 267 * is the same as the order of the incoming stanzas. Therefore we use a <i>single</i> threaded executor service. 268 */ 269 private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor(new SmackExecutorThreadFactory( 270 this, "Single Threaded Executor")); 271 272 /** 273 * The used host to establish the connection to 274 */ 275 protected String host; 276 277 /** 278 * The used port to establish the connection to 279 */ 280 protected int port; 281 282 /** 283 * Flag that indicates if the user is currently authenticated with the server. 284 */ 285 protected boolean authenticated = false; 286 287 /** 288 * Flag that indicates if the user was authenticated with the server when the connection 289 * to the server was closed (abruptly or not). 290 */ 291 protected boolean wasAuthenticated = false; 292 293 private final Map<String, IQRequestHandler> setIqRequestHandler = new HashMap<>(); 294 private final Map<String, IQRequestHandler> getIqRequestHandler = new HashMap<>(); 295 296 /** 297 * Create a new XMPPConnection to an XMPP server. 298 * 299 * @param configuration The configuration which is used to establish the connection. 300 */ 301 protected AbstractXMPPConnection(ConnectionConfiguration configuration) { 302 saslAuthentication = new SASLAuthentication(this, configuration); 303 config = configuration; 304 // Notify listeners that a new connection has been established 305 for (ConnectionCreationListener listener : XMPPConnectionRegistry.getConnectionCreationListeners()) { 306 listener.connectionCreated(this); 307 } 308 } 309 310 /** 311 * Get the connection configuration used by this connection. 312 * 313 * @return the connection configuration. 314 */ 315 public ConnectionConfiguration getConfiguration() { 316 return config; 317 } 318 319 @SuppressWarnings("deprecation") 320 @Override 321 public DomainBareJid getServiceName() { 322 return getXMPPServiceDomain(); 323 } 324 325 @Override 326 public DomainBareJid getXMPPServiceDomain() { 327 if (xmppServiceDomain != null) { 328 return xmppServiceDomain; 329 } 330 return config.getXMPPServiceDomain(); 331 } 332 333 @Override 334 public String getHost() { 335 return host; 336 } 337 338 @Override 339 public int getPort() { 340 return port; 341 } 342 343 @Override 344 public abstract boolean isSecureConnection(); 345 346 protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException, InterruptedException; 347 348 @Override 349 public abstract void sendNonza(Nonza element) throws NotConnectedException, InterruptedException; 350 351 @Override 352 public abstract boolean isUsingCompression(); 353 354 /** 355 * Establishes a connection to the XMPP server. It basically 356 * creates and maintains a connection to the server. 357 * <p> 358 * Listeners will be preserved from a previous connection. 359 * </p> 360 * 361 * @throws XMPPException if an error occurs on the XMPP protocol level. 362 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 363 * @throws IOException 364 * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>. 365 * @throws InterruptedException 366 */ 367 public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException, InterruptedException { 368 // Check if not already connected 369 throwAlreadyConnectedExceptionIfAppropriate(); 370 371 // Reset the connection state 372 saslAuthentication.init(); 373 saslFeatureReceived.init(); 374 lastFeaturesReceived.init(); 375 tlsHandled.init(); 376 streamId = null; 377 378 // Perform the actual connection to the XMPP service 379 connectInternal(); 380 381 // TLS handled will be successful either if TLS was established, or if it was not mandatory. 382 tlsHandled.checkIfSuccessOrWaitOrThrow(); 383 384 // Wait with SASL auth until the SASL mechanisms have been received 385 saslFeatureReceived.checkIfSuccessOrWaitOrThrow(); 386 387 // If TLS is required but the server doesn't offer it, disconnect 388 // from the server and throw an error. First check if we've already negotiated TLS 389 // and are secure, however (features get parsed a second time after TLS is established). 390 if (!isSecureConnection() && getConfiguration().getSecurityMode() == SecurityMode.required) { 391 shutdown(); 392 throw new SecurityRequiredByClientException(); 393 } 394 395 // Make note of the fact that we're now connected. 396 connected = true; 397 callConnectionConnectedListener(); 398 399 return this; 400 } 401 402 /** 403 * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their 404 * way of XMPP connection establishment. Implementations are required to perform an automatic 405 * login if the previous connection state was logged (authenticated). 406 * 407 * @throws SmackException 408 * @throws IOException 409 * @throws XMPPException 410 * @throws InterruptedException 411 */ 412 protected abstract void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException; 413 414 private String usedUsername, usedPassword; 415 416 /** 417 * The resourcepart used for this connection. May not be the resulting resourcepart if it's null or overridden by the XMPP service. 418 */ 419 private Resourcepart usedResource; 420 421 /** 422 * Logs in to the server using the strongest SASL mechanism supported by 423 * the server. If more than the connection's default stanza(/packet) timeout elapses in each step of the 424 * authentication process without a response from the server, a 425 * {@link SmackException.NoResponseException} will be thrown. 426 * <p> 427 * Before logging in (i.e. authenticate) to the server the connection must be connected 428 * by calling {@link #connect}. 429 * </p> 430 * <p> 431 * It is possible to log in without sending an initial available presence by using 432 * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. 433 * Finally, if you want to not pass a password and instead use a more advanced mechanism 434 * while using SASL then you may be interested in using 435 * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. 436 * For more advanced login settings see {@link ConnectionConfiguration}. 437 * </p> 438 * 439 * @throws XMPPException if an error occurs on the XMPP protocol level. 440 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 441 * @throws IOException if an I/O error occurs during login. 442 * @throws InterruptedException 443 */ 444 public synchronized void login() throws XMPPException, SmackException, IOException, InterruptedException { 445 // The previously used username, password and resource take over precedence over the 446 // ones from the connection configuration 447 CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); 448 String password = usedPassword != null ? usedPassword : config.getPassword(); 449 Resourcepart resource = usedResource != null ? usedResource : config.getResource(); 450 login(username, password, resource); 451 } 452 453 /** 454 * Same as {@link #login(CharSequence, String, Resourcepart)}, but takes the resource from the connection 455 * configuration. 456 * 457 * @param username 458 * @param password 459 * @throws XMPPException 460 * @throws SmackException 461 * @throws IOException 462 * @throws InterruptedException 463 * @see #login 464 */ 465 public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, 466 IOException, InterruptedException { 467 login(username, password, config.getResource()); 468 } 469 470 /** 471 * Login with the given username (authorization identity). You may omit the password if a callback handler is used. 472 * If resource is null, then the server will generate one. 473 * 474 * @param username 475 * @param password 476 * @param resource 477 * @throws XMPPException 478 * @throws SmackException 479 * @throws IOException 480 * @throws InterruptedException 481 * @see #login 482 */ 483 public synchronized void login(CharSequence username, String password, Resourcepart resource) throws XMPPException, 484 SmackException, IOException, InterruptedException { 485 if (!config.allowNullOrEmptyUsername) { 486 StringUtils.requireNotNullOrEmpty(username, "Username must not be null or empty"); 487 } 488 throwNotConnectedExceptionIfAppropriate("Did you call connect() before login()?"); 489 throwAlreadyLoggedInExceptionIfAppropriate(); 490 usedUsername = username != null ? username.toString() : null; 491 usedPassword = password; 492 usedResource = resource; 493 loginInternal(usedUsername, usedPassword, usedResource); 494 } 495 496 protected abstract void loginInternal(String username, String password, Resourcepart resource) 497 throws XMPPException, SmackException, IOException, InterruptedException; 498 499 @Override 500 public final boolean isConnected() { 501 return connected; 502 } 503 504 @Override 505 public final boolean isAuthenticated() { 506 return authenticated; 507 } 508 509 @Override 510 public final EntityFullJid getUser() { 511 return user; 512 } 513 514 @Override 515 public String getStreamId() { 516 if (!isConnected()) { 517 return null; 518 } 519 return streamId; 520 } 521 522 protected void bindResourceAndEstablishSession(Resourcepart resource) throws XMPPErrorException, 523 SmackException, InterruptedException { 524 525 // Wait until either: 526 // - the servers last features stanza has been parsed 527 // - the timeout occurs 528 LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); 529 lastFeaturesReceived.checkIfSuccessOrWait(); 530 531 532 if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 533 // Server never offered resource binding, which is REQURIED in XMPP client and 534 // server implementations as per RFC6120 7.2 535 throw new ResourceBindingNotOfferedException(); 536 } 537 538 // Resource binding, see RFC6120 7. 539 // Note that we can not use IQReplyFilter here, since the users full JID is not yet 540 // available. It will become available right after the resource has been successfully bound. 541 Bind bindResource = Bind.newSet(resource); 542 StanzaCollector packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(bindResource), bindResource); 543 Bind response = packetCollector.nextResultOrThrow(); 544 // Set the connections user to the result of resource binding. It is important that we don't infer the user 545 // from the login() arguments and the configurations service name, as, for example, when SASL External is used, 546 // the username is not given to login but taken from the 'external' certificate. 547 user = response.getJid(); 548 xmppServiceDomain = user.asDomainBareJid(); 549 550 Session.Feature sessionFeature = getFeature(Session.ELEMENT, Session.NAMESPACE); 551 // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled 552 // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01 553 // TODO remove this suppression once "disable legacy session" code has been removed from Smack 554 @SuppressWarnings("deprecation") 555 boolean legacySessionDisabled = getConfiguration().isLegacySessionDisabled(); 556 if (sessionFeature != null && !sessionFeature.isOptional() && !legacySessionDisabled) { 557 Session session = new Session(); 558 packetCollector = createStanzaCollectorAndSend(new StanzaIdFilter(session), session); 559 packetCollector.nextResultOrThrow(); 560 } 561 } 562 563 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException, InterruptedException { 564 // Indicate that we're now authenticated. 565 this.authenticated = true; 566 567 // If debugging is enabled, change the the debug window title to include the 568 // name we are now logged-in as. 569 // If DEBUG was set to true AFTER the connection was created the debugger 570 // will be null 571 if (config.isDebuggerEnabled() && debugger != null) { 572 debugger.userHasLogged(user); 573 } 574 callConnectionAuthenticatedListener(resumed); 575 576 // Set presence to online. It is important that this is done after 577 // callConnectionAuthenticatedListener(), as this call will also 578 // eventually load the roster. And we should load the roster before we 579 // send the initial presence. 580 if (config.isSendPresence() && !resumed) { 581 sendStanza(new Presence(Presence.Type.available)); 582 } 583 } 584 585 @Override 586 public final boolean isAnonymous() { 587 return isAuthenticated() && SASLAnonymous.NAME.equals(getUsedSaslMechansism()); 588 } 589 590 /** 591 * Get the name of the SASL mechanism that was used to authenticate this connection. This returns the name of 592 * mechanism which was used the last time this conneciton was authenticated, and will return <code>null</code> if 593 * this connection was not authenticated before. 594 * 595 * @return the name of the used SASL mechanism. 596 * @since 4.2 597 */ 598 public final String getUsedSaslMechansism() { 599 return saslAuthentication.getNameOfLastUsedSaslMechansism(); 600 } 601 602 private DomainBareJid xmppServiceDomain; 603 604 protected List<HostAddress> hostAddresses; 605 606 /** 607 * Populates {@link #hostAddresses} with the resolved addresses or with the configured host address. If no host 608 * address was configured and all lookups failed, for example with NX_DOMAIN, then {@link #hostAddresses} will be 609 * populated with the empty list. 610 * 611 * @return a list of host addresses where DNS (SRV) RR resolution failed. 612 */ 613 protected List<HostAddress> populateHostAddresses() { 614 List<HostAddress> failedAddresses = new LinkedList<>(); 615 if (config.hostAddress != null) { 616 hostAddresses = new ArrayList<>(1); 617 HostAddress hostAddress = new HostAddress(config.port, config.hostAddress); 618 hostAddresses.add(hostAddress); 619 } 620 else if (config.host != null) { 621 hostAddresses = new ArrayList<HostAddress>(1); 622 HostAddress hostAddress = DNSUtil.getDNSResolver().lookupHostAddress(config.host, config.port, failedAddresses, config.getDnssecMode()); 623 if (hostAddress != null) { 624 hostAddresses.add(hostAddress); 625 } 626 } else { 627 // N.B.: Important to use config.serviceName and not AbstractXMPPConnection.serviceName 628 hostAddresses = DNSUtil.resolveXMPPServiceDomain(config.getXMPPServiceDomain().toString(), failedAddresses, config.getDnssecMode()); 629 } 630 // Either the populated host addresses are not empty *or* there must be at least one failed address. 631 assert (!hostAddresses.isEmpty() || !failedAddresses.isEmpty()); 632 return failedAddresses; 633 } 634 635 protected Lock getConnectionLock() { 636 return connectionLock; 637 } 638 639 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 640 throwNotConnectedExceptionIfAppropriate(null); 641 } 642 643 protected void throwNotConnectedExceptionIfAppropriate(String optionalHint) throws NotConnectedException { 644 if (!isConnected()) { 645 throw new NotConnectedException(optionalHint); 646 } 647 } 648 649 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 650 if (isConnected()) { 651 throw new AlreadyConnectedException(); 652 } 653 } 654 655 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 656 if (isAuthenticated()) { 657 throw new AlreadyLoggedInException(); 658 } 659 } 660 661 @Deprecated 662 @Override 663 public void sendPacket(Stanza packet) throws NotConnectedException, InterruptedException { 664 sendStanza(packet); 665 } 666 667 @Override 668 public void sendStanza(Stanza stanza) throws NotConnectedException, InterruptedException { 669 Objects.requireNonNull(stanza, "Stanza must not be null"); 670 assert (stanza instanceof Message || stanza instanceof Presence || stanza instanceof IQ); 671 672 throwNotConnectedExceptionIfAppropriate(); 673 switch (fromMode) { 674 case OMITTED: 675 stanza.setFrom((Jid) null); 676 break; 677 case USER: 678 stanza.setFrom(getUser()); 679 break; 680 case UNCHANGED: 681 default: 682 break; 683 } 684 // Invoke interceptors for the new stanza that is about to be sent. Interceptors may modify 685 // the content of the stanza. 686 firePacketInterceptors(stanza); 687 sendStanzaInternal(stanza); 688 } 689 690 /** 691 * Returns the SASLAuthentication manager that is responsible for authenticating with 692 * the server. 693 * 694 * @return the SASLAuthentication manager that is responsible for authenticating with 695 * the server. 696 */ 697 protected SASLAuthentication getSASLAuthentication() { 698 return saslAuthentication; 699 } 700 701 /** 702 * Closes the connection by setting presence to unavailable then closing the connection to 703 * the XMPP server. The XMPPConnection can still be used for connecting to the server 704 * again. 705 * 706 */ 707 public void disconnect() { 708 try { 709 disconnect(new Presence(Presence.Type.unavailable)); 710 } 711 catch (NotConnectedException e) { 712 LOGGER.log(Level.FINEST, "Connection is already disconnected", e); 713 } 714 } 715 716 /** 717 * Closes the connection. A custom unavailable presence is sent to the server, followed 718 * by closing the stream. The XMPPConnection can still be used for connecting to the server 719 * again. A custom unavailable presence is useful for communicating offline presence 720 * information such as "On vacation". Typically, just the status text of the presence 721 * stanza(/packet) is set with online information, but most XMPP servers will deliver the full 722 * presence stanza(/packet) with whatever data is set. 723 * 724 * @param unavailablePresence the presence stanza(/packet) to send during shutdown. 725 * @throws NotConnectedException 726 */ 727 public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { 728 try { 729 sendStanza(unavailablePresence); 730 } 731 catch (InterruptedException e) { 732 LOGGER.log(Level.FINE, "Was interrupted while sending unavailable presence. Continuing to disconnect the connection", e); 733 } 734 shutdown(); 735 callConnectionClosedListener(); 736 } 737 738 /** 739 * Shuts the current connection down. 740 */ 741 protected abstract void shutdown(); 742 743 @Override 744 public void addConnectionListener(ConnectionListener connectionListener) { 745 if (connectionListener == null) { 746 return; 747 } 748 connectionListeners.add(connectionListener); 749 } 750 751 @Override 752 public void removeConnectionListener(ConnectionListener connectionListener) { 753 connectionListeners.remove(connectionListener); 754 } 755 756 @Override 757 public StanzaCollector createStanzaCollectorAndSend(IQ packet) throws NotConnectedException, InterruptedException { 758 StanzaFilter packetFilter = new IQReplyFilter(packet, this); 759 // Create the packet collector before sending the packet 760 StanzaCollector packetCollector = createStanzaCollectorAndSend(packetFilter, packet); 761 return packetCollector; 762 } 763 764 @Override 765 public StanzaCollector createStanzaCollectorAndSend(StanzaFilter packetFilter, Stanza packet) 766 throws NotConnectedException, InterruptedException { 767 // Create the packet collector before sending the packet 768 StanzaCollector packetCollector = createStanzaCollector(packetFilter); 769 try { 770 // Now we can send the packet as the collector has been created 771 sendStanza(packet); 772 } 773 catch (InterruptedException | NotConnectedException | RuntimeException e) { 774 packetCollector.cancel(); 775 throw e; 776 } 777 return packetCollector; 778 } 779 780 @Override 781 public StanzaCollector createStanzaCollector(StanzaFilter packetFilter) { 782 StanzaCollector.Configuration configuration = StanzaCollector.newConfiguration().setStanzaFilter(packetFilter); 783 return createStanzaCollector(configuration); 784 } 785 786 @Override 787 public StanzaCollector createStanzaCollector(StanzaCollector.Configuration configuration) { 788 StanzaCollector collector = new StanzaCollector(this, configuration); 789 // Add the collector to the list of active collectors. 790 collectors.add(collector); 791 return collector; 792 } 793 794 @Override 795 public void removeStanzaCollector(StanzaCollector collector) { 796 collectors.remove(collector); 797 } 798 799 @Override 800 @Deprecated 801 public void addPacketListener(StanzaListener packetListener, StanzaFilter packetFilter) { 802 addAsyncStanzaListener(packetListener, packetFilter); 803 } 804 805 @Override 806 @Deprecated 807 public boolean removePacketListener(StanzaListener packetListener) { 808 return removeAsyncStanzaListener(packetListener); 809 } 810 811 @Override 812 public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 813 if (packetListener == null) { 814 throw new NullPointerException("Packet listener is null."); 815 } 816 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 817 synchronized (syncRecvListeners) { 818 syncRecvListeners.put(packetListener, wrapper); 819 } 820 } 821 822 @Override 823 public boolean removeSyncStanzaListener(StanzaListener packetListener) { 824 synchronized (syncRecvListeners) { 825 return syncRecvListeners.remove(packetListener) != null; 826 } 827 } 828 829 @Override 830 public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 831 if (packetListener == null) { 832 throw new NullPointerException("Packet listener is null."); 833 } 834 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 835 synchronized (asyncRecvListeners) { 836 asyncRecvListeners.put(packetListener, wrapper); 837 } 838 } 839 840 @Override 841 public boolean removeAsyncStanzaListener(StanzaListener packetListener) { 842 synchronized (asyncRecvListeners) { 843 return asyncRecvListeners.remove(packetListener) != null; 844 } 845 } 846 847 @Override 848 public void addPacketSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 849 if (packetListener == null) { 850 throw new NullPointerException("Packet listener is null."); 851 } 852 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 853 synchronized (sendListeners) { 854 sendListeners.put(packetListener, wrapper); 855 } 856 } 857 858 @Override 859 public void removePacketSendingListener(StanzaListener packetListener) { 860 synchronized (sendListeners) { 861 sendListeners.remove(packetListener); 862 } 863 } 864 865 /** 866 * Process all stanza(/packet) listeners for sending packets. 867 * <p> 868 * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. 869 * </p> 870 * 871 * @param packet the stanza(/packet) to process. 872 */ 873 @SuppressWarnings("javadoc") 874 protected void firePacketSendingListeners(final Stanza packet) { 875 final List<StanzaListener> listenersToNotify = new LinkedList<StanzaListener>(); 876 synchronized (sendListeners) { 877 for (ListenerWrapper listenerWrapper : sendListeners.values()) { 878 if (listenerWrapper.filterMatches(packet)) { 879 listenersToNotify.add(listenerWrapper.getListener()); 880 } 881 } 882 } 883 if (listenersToNotify.isEmpty()) { 884 return; 885 } 886 // Notify in a new thread, because we can 887 asyncGo(new Runnable() { 888 @Override 889 public void run() { 890 for (StanzaListener listener : listenersToNotify) { 891 try { 892 listener.processStanza(packet); 893 } 894 catch (Exception e) { 895 LOGGER.log(Level.WARNING, "Sending listener threw exception", e); 896 continue; 897 } 898 } 899 } 900 }); 901 } 902 903 @Override 904 public void addPacketInterceptor(StanzaListener packetInterceptor, 905 StanzaFilter packetFilter) { 906 if (packetInterceptor == null) { 907 throw new NullPointerException("Packet interceptor is null."); 908 } 909 InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter); 910 synchronized (interceptors) { 911 interceptors.put(packetInterceptor, interceptorWrapper); 912 } 913 } 914 915 @Override 916 public void removePacketInterceptor(StanzaListener packetInterceptor) { 917 synchronized (interceptors) { 918 interceptors.remove(packetInterceptor); 919 } 920 } 921 922 /** 923 * Process interceptors. Interceptors may modify the stanza(/packet) that is about to be sent. 924 * Since the thread that requested to send the stanza(/packet) will invoke all interceptors, it 925 * is important that interceptors perform their work as soon as possible so that the 926 * thread does not remain blocked for a long period. 927 * 928 * @param packet the stanza(/packet) that is going to be sent to the server 929 */ 930 private void firePacketInterceptors(Stanza packet) { 931 List<StanzaListener> interceptorsToInvoke = new LinkedList<StanzaListener>(); 932 synchronized (interceptors) { 933 for (InterceptorWrapper interceptorWrapper : interceptors.values()) { 934 if (interceptorWrapper.filterMatches(packet)) { 935 interceptorsToInvoke.add(interceptorWrapper.getInterceptor()); 936 } 937 } 938 } 939 for (StanzaListener interceptor : interceptorsToInvoke) { 940 try { 941 interceptor.processStanza(packet); 942 } catch (Exception e) { 943 LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e); 944 } 945 } 946 } 947 948 /** 949 * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger} 950 * by setup the system property <code>smack.debuggerClass</code> to the implementation. 951 * 952 * @throws IllegalStateException if the reader or writer isn't yet initialized. 953 * @throws IllegalArgumentException if the SmackDebugger can't be loaded. 954 */ 955 protected void initDebugger() { 956 if (reader == null || writer == null) { 957 throw new NullPointerException("Reader or writer isn't initialized."); 958 } 959 // If debugging is enabled, we open a window and write out all network traffic. 960 if (config.isDebuggerEnabled()) { 961 if (debugger == null) { 962 debugger = SmackConfiguration.createDebugger(this, writer, reader); 963 } 964 965 if (debugger == null) { 966 LOGGER.severe("Debugging enabled but could not find debugger class"); 967 } else { 968 // Obtain new reader and writer from the existing debugger 969 reader = debugger.newConnectionReader(reader); 970 writer = debugger.newConnectionWriter(writer); 971 } 972 } 973 } 974 975 @SuppressWarnings("deprecation") 976 @Override 977 public long getPacketReplyTimeout() { 978 return getReplyTimeout(); 979 } 980 981 @SuppressWarnings("deprecation") 982 @Override 983 public void setPacketReplyTimeout(long timeout) { 984 setReplyTimeout(timeout); 985 } 986 987 @Override 988 public long getReplyTimeout() { 989 return replyTimeout; 990 } 991 992 @Override 993 public void setReplyTimeout(long timeout) { 994 replyTimeout = timeout; 995 } 996 997 /** 998 * Set the default value used to determine if new connection will reply to unknown IQ requests. The pre-configured 999 * default is 'true'. 1000 * 1001 * @param replyToUnkownIqDefault 1002 * @see #setReplyToUnknownIq(boolean) 1003 * @deprecated Use {@link SmackConfiguration#setUnknownIqRequestReplyMode(org.jivesoftware.smack.SmackConfiguration.UnknownIqRequestReplyMode)} instead. 1004 */ 1005 @Deprecated 1006 // TODO Remove in Smack 4.3 1007 public static void setReplyToUnknownIqDefault(boolean replyToUnkownIqDefault) { 1008 SmackConfiguration.UnknownIqRequestReplyMode mode; 1009 if (replyToUnkownIqDefault) { 1010 mode = SmackConfiguration.UnknownIqRequestReplyMode.replyServiceUnavailable; 1011 } else { 1012 mode = SmackConfiguration.UnknownIqRequestReplyMode.doNotReply; 1013 } 1014 SmackConfiguration.setUnknownIqRequestReplyMode(mode); 1015 } 1016 1017 private SmackConfiguration.UnknownIqRequestReplyMode unknownIqRequestReplyMode = SmackConfiguration.getUnknownIqRequestReplyMode(); 1018 1019 public void setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode unknownIqRequestReplyMode) { 1020 this.unknownIqRequestReplyMode = Objects.requireNonNull(unknownIqRequestReplyMode, "Mode must not be null"); 1021 } 1022 1023 /** 1024 * Set if Smack will automatically send 1025 * {@link org.jivesoftware.smack.packet.XMPPError.Condition#feature_not_implemented} when a request IQ without a 1026 * registered {@link IQRequestHandler} is received. 1027 * 1028 * @param replyToUnknownIq 1029 * @deprecated use {@link #setUnknownIqRequestReplyMode(UnknownIqRequestReplyMode)} instead. 1030 */ 1031 @Deprecated 1032 // TODO Remove in Smack 4.3 1033 public void setReplyToUnknownIq(boolean replyToUnknownIq) { 1034 SmackConfiguration.UnknownIqRequestReplyMode mode; 1035 if (replyToUnknownIq) { 1036 mode = SmackConfiguration.UnknownIqRequestReplyMode.replyServiceUnavailable; 1037 } else { 1038 mode = SmackConfiguration.UnknownIqRequestReplyMode.doNotReply; 1039 } 1040 unknownIqRequestReplyMode = mode; 1041 } 1042 1043 protected void parseAndProcessStanza(XmlPullParser parser) throws Exception { 1044 ParserUtils.assertAtStartTag(parser); 1045 int parserDepth = parser.getDepth(); 1046 Stanza stanza = null; 1047 try { 1048 stanza = PacketParserUtils.parseStanza(parser); 1049 } 1050 catch (Exception e) { 1051 CharSequence content = PacketParserUtils.parseContentDepth(parser, 1052 parserDepth); 1053 UnparseableStanza message = new UnparseableStanza(content, e); 1054 ParsingExceptionCallback callback = getParsingExceptionCallback(); 1055 if (callback != null) { 1056 callback.handleUnparsableStanza(message); 1057 } 1058 } 1059 ParserUtils.assertAtEndTag(parser); 1060 if (stanza != null) { 1061 processStanza(stanza); 1062 } 1063 } 1064 1065 /** 1066 * Processes a stanza(/packet) after it's been fully parsed by looping through the installed 1067 * stanza(/packet) collectors and listeners and letting them examine the stanza(/packet) to see if 1068 * they are a match with the filter. 1069 * 1070 * @param stanza the stanza to process. 1071 * @throws InterruptedException 1072 */ 1073 protected void processStanza(final Stanza stanza) throws InterruptedException { 1074 assert (stanza != null); 1075 lastStanzaReceived = System.currentTimeMillis(); 1076 // Deliver the incoming packet to listeners. 1077 executorService.executeBlocking(new Runnable() { 1078 @Override 1079 public void run() { 1080 invokeStanzaCollectorsAndNotifyRecvListeners(stanza); 1081 } 1082 }); 1083 } 1084 1085 /** 1086 * Invoke {@link StanzaCollector#processStanza(Stanza)} for every 1087 * StanzaCollector with the given packet. Also notify the receive listeners with a matching stanza(/packet) filter about the packet. 1088 * 1089 * @param packet the stanza(/packet) to notify the StanzaCollectors and receive listeners about. 1090 */ 1091 protected void invokeStanzaCollectorsAndNotifyRecvListeners(final Stanza packet) { 1092 if (packet instanceof IQ) { 1093 final IQ iq = (IQ) packet; 1094 final IQ.Type type = iq.getType(); 1095 switch (type) { 1096 case set: 1097 case get: 1098 final String key = XmppStringUtils.generateKey(iq.getChildElementName(), iq.getChildElementNamespace()); 1099 IQRequestHandler iqRequestHandler = null; 1100 switch (type) { 1101 case set: 1102 synchronized (setIqRequestHandler) { 1103 iqRequestHandler = setIqRequestHandler.get(key); 1104 } 1105 break; 1106 case get: 1107 synchronized (getIqRequestHandler) { 1108 iqRequestHandler = getIqRequestHandler.get(key); 1109 } 1110 break; 1111 default: 1112 throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); 1113 } 1114 if (iqRequestHandler == null) { 1115 XMPPError.Condition replyCondition; 1116 switch (unknownIqRequestReplyMode) { 1117 case doNotReply: 1118 return; 1119 case replyFeatureNotImplemented: 1120 replyCondition = XMPPError.Condition.feature_not_implemented; 1121 break; 1122 case replyServiceUnavailable: 1123 replyCondition = XMPPError.Condition.service_unavailable; 1124 break; 1125 default: 1126 throw new AssertionError(); 1127 } 1128 1129 // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an 1130 // IQ of type 'error' with condition 'service-unavailable'. 1131 ErrorIQ errorIQ = IQ.createErrorResponse(iq, XMPPError.getBuilder(( 1132 replyCondition))); 1133 try { 1134 sendStanza(errorIQ); 1135 } 1136 catch (InterruptedException | NotConnectedException e) { 1137 LOGGER.log(Level.WARNING, "Exception while sending error IQ to unkown IQ request", e); 1138 } 1139 } else { 1140 ExecutorService executorService = null; 1141 switch (iqRequestHandler.getMode()) { 1142 case sync: 1143 executorService = singleThreadedExecutorService; 1144 break; 1145 case async: 1146 executorService = cachedExecutorService; 1147 break; 1148 } 1149 final IQRequestHandler finalIqRequestHandler = iqRequestHandler; 1150 executorService.execute(new Runnable() { 1151 @Override 1152 public void run() { 1153 IQ response = finalIqRequestHandler.handleIQRequest(iq); 1154 if (response == null) { 1155 // It is not ideal if the IQ request handler does not return an IQ response, because RFC 1156 // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the 1157 // file transfer one, does not always return a result, so we need to handle this case. 1158 // Also sometimes a request handler may decide that it's better to not send a response, 1159 // e.g. to avoid presence leaks. 1160 return; 1161 } 1162 try { 1163 sendStanza(response); 1164 } 1165 catch (InterruptedException | NotConnectedException e) { 1166 LOGGER.log(Level.WARNING, "Exception while sending response to IQ request", e); 1167 } 1168 } 1169 }); 1170 // The following returns makes it impossible for packet listeners and collectors to 1171 // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the 1172 // desired behavior. 1173 return; 1174 } 1175 break; 1176 default: 1177 break; 1178 } 1179 } 1180 1181 // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, 1182 // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in 1183 // their own thread. 1184 final Collection<StanzaListener> listenersToNotify = new LinkedList<StanzaListener>(); 1185 synchronized (asyncRecvListeners) { 1186 for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) { 1187 if (listenerWrapper.filterMatches(packet)) { 1188 listenersToNotify.add(listenerWrapper.getListener()); 1189 } 1190 } 1191 } 1192 1193 for (final StanzaListener listener : listenersToNotify) { 1194 asyncGo(new Runnable() { 1195 @Override 1196 public void run() { 1197 try { 1198 listener.processStanza(packet); 1199 } catch (Exception e) { 1200 LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); 1201 } 1202 } 1203 }); 1204 } 1205 1206 // Loop through all collectors and notify the appropriate ones. 1207 for (StanzaCollector collector : collectors) { 1208 collector.processStanza(packet); 1209 } 1210 1211 // Notify the receive listeners interested in the packet 1212 listenersToNotify.clear(); 1213 synchronized (syncRecvListeners) { 1214 for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) { 1215 if (listenerWrapper.filterMatches(packet)) { 1216 listenersToNotify.add(listenerWrapper.getListener()); 1217 } 1218 } 1219 } 1220 1221 // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single 1222 // threaded executor service and therefore keeps the order. 1223 singleThreadedExecutorService.execute(new Runnable() { 1224 @Override 1225 public void run() { 1226 for (StanzaListener listener : listenersToNotify) { 1227 try { 1228 listener.processStanza(packet); 1229 } catch (NotConnectedException e) { 1230 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1231 break; 1232 } catch (Exception e) { 1233 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1234 } 1235 } 1236 } 1237 }); 1238 1239 } 1240 1241 /** 1242 * Sets whether the connection has already logged in the server. This method assures that the 1243 * {@link #wasAuthenticated} flag is never reset once it has ever been set. 1244 * 1245 */ 1246 protected void setWasAuthenticated() { 1247 // Never reset the flag if the connection has ever been authenticated 1248 if (!wasAuthenticated) { 1249 wasAuthenticated = authenticated; 1250 } 1251 } 1252 1253 protected void callConnectionConnectedListener() { 1254 for (ConnectionListener listener : connectionListeners) { 1255 listener.connected(this); 1256 } 1257 } 1258 1259 protected void callConnectionAuthenticatedListener(boolean resumed) { 1260 for (ConnectionListener listener : connectionListeners) { 1261 try { 1262 listener.authenticated(this, resumed); 1263 } catch (Exception e) { 1264 // Catch and print any exception so we can recover 1265 // from a faulty listener and finish the shutdown process 1266 LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e); 1267 } 1268 } 1269 } 1270 1271 void callConnectionClosedListener() { 1272 for (ConnectionListener listener : connectionListeners) { 1273 try { 1274 listener.connectionClosed(); 1275 } 1276 catch (Exception e) { 1277 // Catch and print any exception so we can recover 1278 // from a faulty listener and finish the shutdown process 1279 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e); 1280 } 1281 } 1282 } 1283 1284 protected void callConnectionClosedOnErrorListener(Exception e) { 1285 boolean logWarning = true; 1286 if (e instanceof StreamErrorException) { 1287 StreamErrorException see = (StreamErrorException) e; 1288 if (see.getStreamError().getCondition() == StreamError.Condition.not_authorized 1289 && wasAuthenticated) { 1290 logWarning = false; 1291 LOGGER.log(Level.FINE, 1292 "Connection closed with not-authorized stream error after it was already authenticated. The account was likely deleted/unregistered on the server"); 1293 } 1294 } 1295 if (logWarning) { 1296 LOGGER.log(Level.WARNING, "Connection " + this + " closed with error", e); 1297 } 1298 for (ConnectionListener listener : connectionListeners) { 1299 try { 1300 listener.connectionClosedOnError(e); 1301 } 1302 catch (Exception e2) { 1303 // Catch and print any exception so we can recover 1304 // from a faulty listener 1305 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2); 1306 } 1307 } 1308 } 1309 1310 /** 1311 * Sends a notification indicating that the connection was reconnected successfully. 1312 */ 1313 protected void notifyReconnection() { 1314 // Notify connection listeners of the reconnection. 1315 for (ConnectionListener listener : connectionListeners) { 1316 try { 1317 listener.reconnectionSuccessful(); 1318 } 1319 catch (Exception e) { 1320 // Catch and print any exception so we can recover 1321 // from a faulty listener 1322 LOGGER.log(Level.WARNING, "notifyReconnection()", e); 1323 } 1324 } 1325 } 1326 1327 /** 1328 * A wrapper class to associate a stanza(/packet) filter with a listener. 1329 */ 1330 protected static class ListenerWrapper { 1331 1332 private final StanzaListener packetListener; 1333 private final StanzaFilter packetFilter; 1334 1335 /** 1336 * Create a class which associates a stanza(/packet) filter with a listener. 1337 * 1338 * @param packetListener the stanza(/packet) listener. 1339 * @param packetFilter the associated filter or null if it listen for all packets. 1340 */ 1341 public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) { 1342 this.packetListener = packetListener; 1343 this.packetFilter = packetFilter; 1344 } 1345 1346 public boolean filterMatches(Stanza packet) { 1347 return packetFilter == null || packetFilter.accept(packet); 1348 } 1349 1350 public StanzaListener getListener() { 1351 return packetListener; 1352 } 1353 } 1354 1355 /** 1356 * A wrapper class to associate a stanza(/packet) filter with an interceptor. 1357 */ 1358 protected static class InterceptorWrapper { 1359 1360 private final StanzaListener packetInterceptor; 1361 private final StanzaFilter packetFilter; 1362 1363 /** 1364 * Create a class which associates a stanza(/packet) filter with an interceptor. 1365 * 1366 * @param packetInterceptor the interceptor. 1367 * @param packetFilter the associated filter or null if it intercepts all packets. 1368 */ 1369 public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) { 1370 this.packetInterceptor = packetInterceptor; 1371 this.packetFilter = packetFilter; 1372 } 1373 1374 public boolean filterMatches(Stanza packet) { 1375 return packetFilter == null || packetFilter.accept(packet); 1376 } 1377 1378 public StanzaListener getInterceptor() { 1379 return packetInterceptor; 1380 } 1381 } 1382 1383 @Override 1384 public int getConnectionCounter() { 1385 return connectionCounterValue; 1386 } 1387 1388 @Override 1389 public void setFromMode(FromMode fromMode) { 1390 this.fromMode = fromMode; 1391 } 1392 1393 @Override 1394 public FromMode getFromMode() { 1395 return this.fromMode; 1396 } 1397 1398 @Override 1399 protected void finalize() throws Throwable { 1400 LOGGER.fine("finalizing " + this + ": Shutting down executor services"); 1401 try { 1402 // It's usually not a good idea to rely on finalize. But this is the easiest way to 1403 // avoid the "Smack Listener Processor" leaking. The thread(s) of the executor have a 1404 // reference to their ExecutorService which prevents the ExecutorService from being 1405 // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the 1406 // listenerExecutor ExecutorService call not be gc'ed until it got shut down. 1407 executorService.shutdownNow(); 1408 cachedExecutorService.shutdown(); 1409 removeCallbacksService.shutdownNow(); 1410 singleThreadedExecutorService.shutdownNow(); 1411 } catch (Throwable t) { 1412 LOGGER.log(Level.WARNING, "finalize() threw trhowable", t); 1413 } 1414 finally { 1415 super.finalize(); 1416 } 1417 } 1418 1419 protected final void parseFeatures(XmlPullParser parser) throws Exception { 1420 streamFeatures.clear(); 1421 final int initialDepth = parser.getDepth(); 1422 while (true) { 1423 int eventType = parser.next(); 1424 1425 if (eventType == XmlPullParser.START_TAG && parser.getDepth() == initialDepth + 1) { 1426 ExtensionElement streamFeature = null; 1427 String name = parser.getName(); 1428 String namespace = parser.getNamespace(); 1429 switch (name) { 1430 case StartTls.ELEMENT: 1431 streamFeature = PacketParserUtils.parseStartTlsFeature(parser); 1432 break; 1433 case Mechanisms.ELEMENT: 1434 streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser)); 1435 break; 1436 case Bind.ELEMENT: 1437 streamFeature = Bind.Feature.INSTANCE; 1438 break; 1439 case Session.ELEMENT: 1440 streamFeature = PacketParserUtils.parseSessionFeature(parser); 1441 break; 1442 case Compress.Feature.ELEMENT: 1443 streamFeature = PacketParserUtils.parseCompressionFeature(parser); 1444 break; 1445 default: 1446 ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace); 1447 if (provider != null) { 1448 streamFeature = provider.parse(parser); 1449 } 1450 break; 1451 } 1452 if (streamFeature != null) { 1453 addStreamFeature(streamFeature); 1454 } 1455 } 1456 else if (eventType == XmlPullParser.END_TAG && parser.getDepth() == initialDepth) { 1457 break; 1458 } 1459 } 1460 1461 if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) { 1462 // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it 1463 if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) 1464 || config.getSecurityMode() == SecurityMode.disabled) { 1465 tlsHandled.reportSuccess(); 1466 saslFeatureReceived.reportSuccess(); 1467 } 1468 } 1469 1470 // If the server reported the bind feature then we are that that we did SASL and maybe 1471 // STARTTLS. We can then report that the last 'stream:features' have been parsed 1472 if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 1473 if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE) 1474 || !config.isCompressionEnabled()) { 1475 // This was was last features from the server is either it did not contain 1476 // compression or if we disabled it 1477 lastFeaturesReceived.reportSuccess(); 1478 } 1479 } 1480 afterFeaturesReceived(); 1481 } 1482 1483 @SuppressWarnings("unused") 1484 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException, InterruptedException { 1485 // Default implementation does nothing 1486 } 1487 1488 @SuppressWarnings("unchecked") 1489 @Override 1490 public <F extends ExtensionElement> F getFeature(String element, String namespace) { 1491 return (F) streamFeatures.get(XmppStringUtils.generateKey(element, namespace)); 1492 } 1493 1494 @Override 1495 public boolean hasFeature(String element, String namespace) { 1496 return getFeature(element, namespace) != null; 1497 } 1498 1499 protected void addStreamFeature(ExtensionElement feature) { 1500 String key = XmppStringUtils.generateKey(feature.getElementName(), feature.getNamespace()); 1501 streamFeatures.put(key, feature); 1502 } 1503 1504 @Override 1505 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1506 StanzaListener callback) throws NotConnectedException, InterruptedException { 1507 sendStanzaWithResponseCallback(stanza, replyFilter, callback, null); 1508 } 1509 1510 @Override 1511 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1512 StanzaListener callback, ExceptionCallback exceptionCallback) 1513 throws NotConnectedException, InterruptedException { 1514 sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback, 1515 getReplyTimeout()); 1516 } 1517 1518 @Override 1519 public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter, 1520 final StanzaListener callback, final ExceptionCallback exceptionCallback, 1521 long timeout) throws NotConnectedException, InterruptedException { 1522 Objects.requireNonNull(stanza, "stanza must not be null"); 1523 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1524 // disallow it here in the async API as it makes no sense 1525 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1526 Objects.requireNonNull(callback, "callback must not be null"); 1527 1528 final StanzaListener packetListener = new StanzaListener() { 1529 @Override 1530 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException { 1531 boolean removed = removeAsyncStanzaListener(this); 1532 if (!removed) { 1533 // We lost a race against the "no response" handling runnable. Avoid calling the callback, as the 1534 // exception callback will be invoked (if any). 1535 return; 1536 } 1537 try { 1538 XMPPErrorException.ifHasErrorThenThrow(packet); 1539 callback.processStanza(packet); 1540 } 1541 catch (XMPPErrorException e) { 1542 if (exceptionCallback != null) { 1543 exceptionCallback.processException(e); 1544 } 1545 } 1546 } 1547 }; 1548 removeCallbacksService.schedule(new Runnable() { 1549 @Override 1550 public void run() { 1551 boolean removed = removeAsyncStanzaListener(packetListener); 1552 // If the packetListener got removed, then it was never run and 1553 // we never received a response, inform the exception callback 1554 if (removed && exceptionCallback != null) { 1555 Exception exception; 1556 if (!isConnected()) { 1557 // If the connection is no longer connected, throw a not connected exception. 1558 exception = new NotConnectedException(AbstractXMPPConnection.this, replyFilter); 1559 } else { 1560 exception = NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter); 1561 } 1562 exceptionCallback.processException(exception); 1563 } 1564 } 1565 }, timeout, TimeUnit.MILLISECONDS); 1566 addAsyncStanzaListener(packetListener, replyFilter); 1567 sendStanza(stanza); 1568 } 1569 1570 @Override 1571 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback) 1572 throws NotConnectedException, InterruptedException { 1573 sendIqWithResponseCallback(iqRequest, callback, null); 1574 } 1575 1576 @Override 1577 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback, 1578 ExceptionCallback exceptionCallback) throws NotConnectedException, InterruptedException { 1579 sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getReplyTimeout()); 1580 } 1581 1582 @Override 1583 public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback, 1584 final ExceptionCallback exceptionCallback, long timeout) 1585 throws NotConnectedException, InterruptedException { 1586 StanzaFilter replyFilter = new IQReplyFilter(iqRequest, this); 1587 sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout); 1588 } 1589 1590 @Override 1591 public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) { 1592 final StanzaListener packetListener = new StanzaListener() { 1593 @Override 1594 public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException { 1595 try { 1596 callback.processStanza(packet); 1597 } finally { 1598 removeSyncStanzaListener(this); 1599 } 1600 } 1601 }; 1602 addSyncStanzaListener(packetListener, packetFilter); 1603 removeCallbacksService.schedule(new Runnable() { 1604 @Override 1605 public void run() { 1606 removeSyncStanzaListener(packetListener); 1607 } 1608 }, getReplyTimeout(), TimeUnit.MILLISECONDS); 1609 } 1610 1611 @Override 1612 public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) { 1613 final String key = XmppStringUtils.generateKey(iqRequestHandler.getElement(), iqRequestHandler.getNamespace()); 1614 switch (iqRequestHandler.getType()) { 1615 case set: 1616 synchronized (setIqRequestHandler) { 1617 return setIqRequestHandler.put(key, iqRequestHandler); 1618 } 1619 case get: 1620 synchronized (getIqRequestHandler) { 1621 return getIqRequestHandler.put(key, iqRequestHandler); 1622 } 1623 default: 1624 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1625 } 1626 } 1627 1628 @Override 1629 public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) { 1630 return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(), 1631 iqRequestHandler.getType()); 1632 } 1633 1634 @Override 1635 public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) { 1636 final String key = XmppStringUtils.generateKey(element, namespace); 1637 switch (type) { 1638 case set: 1639 synchronized (setIqRequestHandler) { 1640 return setIqRequestHandler.remove(key); 1641 } 1642 case get: 1643 synchronized (getIqRequestHandler) { 1644 return getIqRequestHandler.remove(key); 1645 } 1646 default: 1647 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1648 } 1649 } 1650 1651 private long lastStanzaReceived; 1652 1653 @Override 1654 public long getLastStanzaReceived() { 1655 return lastStanzaReceived; 1656 } 1657 1658 /** 1659 * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a 1660 * stanza. 1661 * 1662 * @param callback the callback to install 1663 */ 1664 public void setParsingExceptionCallback(ParsingExceptionCallback callback) { 1665 parsingExceptionCallback = callback; 1666 } 1667 1668 /** 1669 * Get the current active parsing exception callback. 1670 * 1671 * @return the active exception callback or null if there is none 1672 */ 1673 public ParsingExceptionCallback getParsingExceptionCallback() { 1674 return parsingExceptionCallback; 1675 } 1676 1677 @Override 1678 public final String toString() { 1679 EntityFullJid localEndpoint = getUser(); 1680 String localEndpointString = (localEndpoint == null ? "not-authenticated" : localEndpoint.toString()); 1681 return getClass().getSimpleName() + '[' + localEndpointString + "] (" + getConnectionCounter() + ')'; 1682 } 1683 1684 protected final void asyncGo(Runnable runnable) { 1685 cachedExecutorService.execute(runnable); 1686 } 1687 1688 protected final ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) { 1689 return removeCallbacksService.schedule(runnable, delay, unit); 1690 } 1691}