001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.jivesoftware.smack; 018 019import java.io.IOException; 020import java.io.Reader; 021import java.io.Writer; 022import java.util.ArrayList; 023import java.util.Collection; 024import java.util.HashMap; 025import java.util.LinkedHashMap; 026import java.util.LinkedList; 027import java.util.List; 028import java.util.Map; 029import java.util.Set; 030import java.util.concurrent.ArrayBlockingQueue; 031import java.util.concurrent.ConcurrentLinkedQueue; 032import java.util.concurrent.CopyOnWriteArraySet; 033import java.util.concurrent.ExecutorService; 034import java.util.concurrent.Executors; 035import java.util.concurrent.ScheduledExecutorService; 036import java.util.concurrent.ScheduledFuture; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.atomic.AtomicInteger; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReentrantLock; 042import java.util.logging.Level; 043import java.util.logging.Logger; 044 045import org.jivesoftware.smack.ConnectionConfiguration.SecurityMode; 046import org.jivesoftware.smack.SmackException.AlreadyConnectedException; 047import org.jivesoftware.smack.SmackException.AlreadyLoggedInException; 048import org.jivesoftware.smack.SmackException.NoResponseException; 049import org.jivesoftware.smack.SmackException.NotConnectedException; 050import org.jivesoftware.smack.SmackException.ConnectionException; 051import org.jivesoftware.smack.SmackException.ResourceBindingNotOfferedException; 052import org.jivesoftware.smack.SmackException.SecurityRequiredException; 053import org.jivesoftware.smack.XMPPException.XMPPErrorException; 054import org.jivesoftware.smack.compress.packet.Compress; 055import org.jivesoftware.smack.compression.XMPPInputOutputStream; 056import org.jivesoftware.smack.debugger.SmackDebugger; 057import org.jivesoftware.smack.filter.IQReplyFilter; 058import org.jivesoftware.smack.filter.StanzaFilter; 059import org.jivesoftware.smack.filter.StanzaIdFilter; 060import org.jivesoftware.smack.iqrequest.IQRequestHandler; 061import org.jivesoftware.smack.packet.Bind; 062import org.jivesoftware.smack.packet.ErrorIQ; 063import org.jivesoftware.smack.packet.IQ; 064import org.jivesoftware.smack.packet.Mechanisms; 065import org.jivesoftware.smack.packet.Stanza; 066import org.jivesoftware.smack.packet.ExtensionElement; 067import org.jivesoftware.smack.packet.Presence; 068import org.jivesoftware.smack.packet.Session; 069import org.jivesoftware.smack.packet.StartTls; 070import org.jivesoftware.smack.packet.PlainStreamElement; 071import org.jivesoftware.smack.packet.XMPPError; 072import org.jivesoftware.smack.parsing.ParsingExceptionCallback; 073import org.jivesoftware.smack.parsing.UnparsablePacket; 074import org.jivesoftware.smack.provider.ExtensionElementProvider; 075import org.jivesoftware.smack.provider.ProviderManager; 076import org.jivesoftware.smack.util.DNSUtil; 077import org.jivesoftware.smack.util.Objects; 078import org.jivesoftware.smack.util.PacketParserUtils; 079import org.jivesoftware.smack.util.ParserUtils; 080import org.jivesoftware.smack.util.SmackExecutorThreadFactory; 081import org.jivesoftware.smack.util.StringUtils; 082import org.jivesoftware.smack.util.dns.HostAddress; 083import org.jxmpp.util.XmppStringUtils; 084import org.xmlpull.v1.XmlPullParser; 085import org.xmlpull.v1.XmlPullParserException; 086 087 088public abstract class AbstractXMPPConnection implements XMPPConnection { 089 private static final Logger LOGGER = Logger.getLogger(AbstractXMPPConnection.class.getName()); 090 091 /** 092 * Counter to uniquely identify connections that are created. 093 */ 094 private final static AtomicInteger connectionCounter = new AtomicInteger(0); 095 096 static { 097 // Ensure the SmackConfiguration class is loaded by calling a method in it. 098 SmackConfiguration.getVersion(); 099 } 100 101 /** 102 * Get the collection of listeners that are interested in connection creation events. 103 * 104 * @return a collection of listeners interested on new connections. 105 */ 106 protected static Collection<ConnectionCreationListener> getConnectionCreationListeners() { 107 return XMPPConnectionRegistry.getConnectionCreationListeners(); 108 } 109 110 /** 111 * A collection of ConnectionListeners which listen for connection closing 112 * and reconnection events. 113 */ 114 protected final Set<ConnectionListener> connectionListeners = 115 new CopyOnWriteArraySet<ConnectionListener>(); 116 117 /** 118 * A collection of PacketCollectors which collects packets for a specified filter 119 * and perform blocking and polling operations on the result queue. 120 * <p> 121 * We use a ConcurrentLinkedQueue here, because its Iterator is weakly 122 * consistent and we want {@link #invokePacketCollectors(Stanza)} for-each 123 * loop to be lock free. As drawback, removing a PacketCollector is O(n). 124 * The alternative would be a synchronized HashSet, but this would mean a 125 * synchronized block around every usage of <code>collectors</code>. 126 * </p> 127 */ 128 private final Collection<PacketCollector> collectors = new ConcurrentLinkedQueue<PacketCollector>(); 129 130 /** 131 * List of PacketListeners that will be notified synchronously when a new stanza(/packet) was received. 132 */ 133 private final Map<StanzaListener, ListenerWrapper> syncRecvListeners = new LinkedHashMap<>(); 134 135 /** 136 * List of PacketListeners that will be notified asynchronously when a new stanza(/packet) was received. 137 */ 138 private final Map<StanzaListener, ListenerWrapper> asyncRecvListeners = new LinkedHashMap<>(); 139 140 /** 141 * List of PacketListeners that will be notified when a new stanza(/packet) was sent. 142 */ 143 private final Map<StanzaListener, ListenerWrapper> sendListeners = 144 new HashMap<StanzaListener, ListenerWrapper>(); 145 146 /** 147 * List of PacketListeners that will be notified when a new stanza(/packet) is about to be 148 * sent to the server. These interceptors may modify the stanza(/packet) before it is being 149 * actually sent to the server. 150 */ 151 private final Map<StanzaListener, InterceptorWrapper> interceptors = 152 new HashMap<StanzaListener, InterceptorWrapper>(); 153 154 protected final Lock connectionLock = new ReentrantLock(); 155 156 protected final Map<String, ExtensionElement> streamFeatures = new HashMap<String, ExtensionElement>(); 157 158 /** 159 * The full JID of the authenticated user, as returned by the resource binding response of the server. 160 * <p> 161 * It is important that we don't infer the user from the login() arguments and the configurations service name, as, 162 * for example, when SASL External is used, the username is not given to login but taken from the 'external' 163 * certificate. 164 * </p> 165 */ 166 protected String user; 167 168 protected boolean connected = false; 169 170 /** 171 * The stream ID, see RFC 6120 § 4.7.3 172 */ 173 protected String streamId; 174 175 /** 176 * 177 */ 178 private long packetReplyTimeout = SmackConfiguration.getDefaultPacketReplyTimeout(); 179 180 /** 181 * The SmackDebugger allows to log and debug XML traffic. 182 */ 183 protected SmackDebugger debugger = null; 184 185 /** 186 * The Reader which is used for the debugger. 187 */ 188 protected Reader reader; 189 190 /** 191 * The Writer which is used for the debugger. 192 */ 193 protected Writer writer; 194 195 /** 196 * Set to success if the last features stanza from the server has been parsed. A XMPP connection 197 * handshake can invoke multiple features stanzas, e.g. when TLS is activated a second feature 198 * stanza is send by the server. This is set to true once the last feature stanza has been 199 * parsed. 200 */ 201 protected final SynchronizationPoint<Exception> lastFeaturesReceived = new SynchronizationPoint<Exception>( 202 AbstractXMPPConnection.this); 203 204 /** 205 * Set to success if the sasl feature has been received. 206 */ 207 protected final SynchronizationPoint<SmackException> saslFeatureReceived = new SynchronizationPoint<SmackException>( 208 AbstractXMPPConnection.this); 209 210 /** 211 * The SASLAuthentication manager that is responsible for authenticating with the server. 212 */ 213 protected SASLAuthentication saslAuthentication = new SASLAuthentication(this); 214 215 /** 216 * A number to uniquely identify connections that are created. This is distinct from the 217 * connection ID, which is a value sent by the server once a connection is made. 218 */ 219 protected final int connectionCounterValue = connectionCounter.getAndIncrement(); 220 221 /** 222 * Holds the initial configuration used while creating the connection. 223 */ 224 protected final ConnectionConfiguration config; 225 226 /** 227 * Defines how the from attribute of outgoing stanzas should be handled. 228 */ 229 private FromMode fromMode = FromMode.OMITTED; 230 231 protected XMPPInputOutputStream compressionHandler; 232 233 private ParsingExceptionCallback parsingExceptionCallback = SmackConfiguration.getDefaultParsingExceptionCallback(); 234 235 /** 236 * ExecutorService used to invoke the PacketListeners on newly arrived and parsed stanzas. It is 237 * important that we use a <b>single threaded ExecutorService</b> in order to guarantee that the 238 * PacketListeners are invoked in the same order the stanzas arrived. 239 */ 240 private final ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, 241 new ArrayBlockingQueue<Runnable>(100), new SmackExecutorThreadFactory(connectionCounterValue, "Incoming Processor")); 242 243 /** 244 * This scheduled thread pool executor is used to remove pending callbacks. 245 */ 246 private final ScheduledExecutorService removeCallbacksService = Executors.newSingleThreadScheduledExecutor( 247 new SmackExecutorThreadFactory(connectionCounterValue, "Remove Callbacks")); 248 249 /** 250 * A cached thread pool executor service with custom thread factory to set meaningful names on the threads and set 251 * them 'daemon'. 252 */ 253 private final ExecutorService cachedExecutorService = Executors.newCachedThreadPool( 254 // @formatter:off 255 new SmackExecutorThreadFactory( // threadFactory 256 connectionCounterValue, 257 "Cached Executor" 258 ) 259 // @formatter:on 260 ); 261 262 /** 263 * A executor service used to invoke the callbacks of synchronous stanza(/packet) listeners. We use a executor service to 264 * decouple incoming stanza processing from callback invocation. It is important that order of callback invocation 265 * is the same as the order of the incoming stanzas. Therefore we use a <i>single</i> threaded executor service. 266 */ 267 private final ExecutorService singleThreadedExecutorService = Executors.newSingleThreadExecutor(new SmackExecutorThreadFactory( 268 getConnectionCounter(), "Single Threaded Executor")); 269 270 /** 271 * The used host to establish the connection to 272 */ 273 protected String host; 274 275 /** 276 * The used port to establish the connection to 277 */ 278 protected int port; 279 280 /** 281 * Flag that indicates if the user is currently authenticated with the server. 282 */ 283 protected boolean authenticated = false; 284 285 /** 286 * Flag that indicates if the user was authenticated with the server when the connection 287 * to the server was closed (abruptly or not). 288 */ 289 protected boolean wasAuthenticated = false; 290 291 private final Map<String, IQRequestHandler> setIqRequestHandler = new HashMap<>(); 292 private final Map<String, IQRequestHandler> getIqRequestHandler = new HashMap<>(); 293 294 /** 295 * Create a new XMPPConnection to an XMPP server. 296 * 297 * @param configuration The configuration which is used to establish the connection. 298 */ 299 protected AbstractXMPPConnection(ConnectionConfiguration configuration) { 300 config = configuration; 301 } 302 303 protected ConnectionConfiguration getConfiguration() { 304 return config; 305 } 306 307 @Override 308 public String getServiceName() { 309 if (serviceName != null) { 310 return serviceName; 311 } 312 return config.getServiceName(); 313 } 314 315 @Override 316 public String getHost() { 317 return host; 318 } 319 320 @Override 321 public int getPort() { 322 return port; 323 } 324 325 @Override 326 public abstract boolean isSecureConnection(); 327 328 protected abstract void sendStanzaInternal(Stanza packet) throws NotConnectedException; 329 330 @Override 331 public abstract void send(PlainStreamElement element) throws NotConnectedException; 332 333 @Override 334 public abstract boolean isUsingCompression(); 335 336 /** 337 * Establishes a connection to the XMPP server and performs an automatic login 338 * only if the previous connection state was logged (authenticated). It basically 339 * creates and maintains a connection to the server. 340 * <p> 341 * Listeners will be preserved from a previous connection. 342 * 343 * @throws XMPPException if an error occurs on the XMPP protocol level. 344 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 345 * @throws IOException 346 * @throws ConnectionException with detailed information about the failed connection. 347 * @return a reference to this object, to chain <code>connect()</code> with <code>login()</code>. 348 */ 349 public synchronized AbstractXMPPConnection connect() throws SmackException, IOException, XMPPException { 350 // Check if not already connected 351 throwAlreadyConnectedExceptionIfAppropriate(); 352 353 // Reset the connection state 354 saslAuthentication.init(); 355 saslFeatureReceived.init(); 356 lastFeaturesReceived.init(); 357 streamId = null; 358 359 // Perform the actual connection to the XMPP service 360 connectInternal(); 361 return this; 362 } 363 364 /** 365 * Abstract method that concrete subclasses of XMPPConnection need to implement to perform their 366 * way of XMPP connection establishment. Implementations are required to perform an automatic 367 * login if the previous connection state was logged (authenticated). 368 * 369 * @throws SmackException 370 * @throws IOException 371 * @throws XMPPException 372 */ 373 protected abstract void connectInternal() throws SmackException, IOException, XMPPException; 374 375 private String usedUsername, usedPassword, usedResource; 376 377 /** 378 * Logs in to the server using the strongest SASL mechanism supported by 379 * the server. If more than the connection's default stanza(/packet) timeout elapses in each step of the 380 * authentication process without a response from the server, a 381 * {@link SmackException.NoResponseException} will be thrown. 382 * <p> 383 * Before logging in (i.e. authenticate) to the server the connection must be connected 384 * by calling {@link #connect}. 385 * </p> 386 * <p> 387 * It is possible to log in without sending an initial available presence by using 388 * {@link ConnectionConfiguration.Builder#setSendPresence(boolean)}. 389 * Finally, if you want to not pass a password and instead use a more advanced mechanism 390 * while using SASL then you may be interested in using 391 * {@link ConnectionConfiguration.Builder#setCallbackHandler(javax.security.auth.callback.CallbackHandler)}. 392 * For more advanced login settings see {@link ConnectionConfiguration}. 393 * </p> 394 * 395 * @throws XMPPException if an error occurs on the XMPP protocol level. 396 * @throws SmackException if an error occurs somewhere else besides XMPP protocol level. 397 * @throws IOException if an I/O error occurs during login. 398 */ 399 public synchronized void login() throws XMPPException, SmackException, IOException { 400 if (isAnonymous()) { 401 throwNotConnectedExceptionIfAppropriate(); 402 throwAlreadyLoggedInExceptionIfAppropriate(); 403 loginAnonymously(); 404 } else { 405 // The previously used username, password and resource take over precedence over the 406 // ones from the connection configuration 407 CharSequence username = usedUsername != null ? usedUsername : config.getUsername(); 408 String password = usedPassword != null ? usedPassword : config.getPassword(); 409 String resource = usedResource != null ? usedResource : config.getResource(); 410 login(username, password, resource); 411 } 412 } 413 414 /** 415 * Same as {@link #login(CharSequence, String, String)}, but takes the resource from the connection 416 * configuration. 417 * 418 * @param username 419 * @param password 420 * @throws XMPPException 421 * @throws SmackException 422 * @throws IOException 423 * @see #login 424 */ 425 public synchronized void login(CharSequence username, String password) throws XMPPException, SmackException, 426 IOException { 427 login(username, password, config.getResource()); 428 } 429 430 /** 431 * Login with the given username (authorization identity). You may omit the password if a callback handler is used. 432 * If resource is null, then the server will generate one. 433 * 434 * @param username 435 * @param password 436 * @param resource 437 * @throws XMPPException 438 * @throws SmackException 439 * @throws IOException 440 * @see #login 441 */ 442 public synchronized void login(CharSequence username, String password, String resource) throws XMPPException, 443 SmackException, IOException { 444 if (!config.allowNullOrEmptyUsername) { 445 StringUtils.requireNotNullOrEmpty(username, "Username must not be null or empty"); 446 } 447 throwNotConnectedExceptionIfAppropriate(); 448 throwAlreadyLoggedInExceptionIfAppropriate(); 449 usedUsername = username != null ? username.toString() : null; 450 usedPassword = password; 451 usedResource = resource; 452 loginNonAnonymously(usedUsername, usedPassword, usedResource); 453 } 454 455 protected abstract void loginNonAnonymously(String username, String password, String resource) 456 throws XMPPException, SmackException, IOException; 457 458 protected abstract void loginAnonymously() throws XMPPException, SmackException, IOException; 459 460 @Override 461 public final boolean isConnected() { 462 return connected; 463 } 464 465 @Override 466 public final boolean isAuthenticated() { 467 return authenticated; 468 } 469 470 @Override 471 public final String getUser() { 472 return user; 473 } 474 475 @Override 476 public String getStreamId() { 477 if (!isConnected()) { 478 return null; 479 } 480 return streamId; 481 } 482 483 // TODO remove this suppression once "disable legacy session" code has been removed from Smack 484 @SuppressWarnings("deprecation") 485 protected void bindResourceAndEstablishSession(String resource) throws XMPPErrorException, 486 IOException, SmackException { 487 488 // Wait until either: 489 // - the servers last features stanza has been parsed 490 // - the timeout occurs 491 LOGGER.finer("Waiting for last features to be received before continuing with resource binding"); 492 lastFeaturesReceived.checkIfSuccessOrWait(); 493 494 495 if (!hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 496 // Server never offered resource binding, which is REQURIED in XMPP client and 497 // server implementations as per RFC6120 7.2 498 throw new ResourceBindingNotOfferedException(); 499 } 500 501 // Resource binding, see RFC6120 7. 502 // Note that we can not use IQReplyFilter here, since the users full JID is not yet 503 // available. It will become available right after the resource has been successfully bound. 504 Bind bindResource = Bind.newSet(resource); 505 PacketCollector packetCollector = createPacketCollectorAndSend(new StanzaIdFilter(bindResource), bindResource); 506 Bind response = packetCollector.nextResultOrThrow(); 507 // Set the connections user to the result of resource binding. It is important that we don't infer the user 508 // from the login() arguments and the configurations service name, as, for example, when SASL External is used, 509 // the username is not given to login but taken from the 'external' certificate. 510 user = response.getJid(); 511 serviceName = XmppStringUtils.parseDomain(user); 512 513 Session.Feature sessionFeature = getFeature(Session.ELEMENT, Session.NAMESPACE); 514 // Only bind the session if it's announced as stream feature by the server, is not optional and not disabled 515 // For more information see http://tools.ietf.org/html/draft-cridland-xmpp-session-01 516 if (sessionFeature != null && !sessionFeature.isOptional() && !getConfiguration().isLegacySessionDisabled()) { 517 Session session = new Session(); 518 packetCollector = createPacketCollectorAndSend(new StanzaIdFilter(session), session); 519 packetCollector.nextResultOrThrow(); 520 } 521 } 522 523 protected void afterSuccessfulLogin(final boolean resumed) throws NotConnectedException { 524 // Indicate that we're now authenticated. 525 this.authenticated = true; 526 527 // If debugging is enabled, change the the debug window title to include the 528 // name we are now logged-in as. 529 // If DEBUG was set to true AFTER the connection was created the debugger 530 // will be null 531 if (config.isDebuggerEnabled() && debugger != null) { 532 debugger.userHasLogged(user); 533 } 534 callConnectionAuthenticatedListener(resumed); 535 536 // Set presence to online. It is important that this is done after 537 // callConnectionAuthenticatedListener(), as this call will also 538 // eventually load the roster. And we should load the roster before we 539 // send the initial presence. 540 if (config.isSendPresence() && !resumed) { 541 sendStanza(new Presence(Presence.Type.available)); 542 } 543 } 544 545 @Override 546 public final boolean isAnonymous() { 547 return config.getUsername() == null && usedUsername == null 548 && !config.allowNullOrEmptyUsername; 549 } 550 551 private String serviceName; 552 553 protected List<HostAddress> hostAddresses; 554 555 /** 556 * Populates {@link #hostAddresses} with at least one host address. 557 * 558 * @return a list of host addresses where DNS (SRV) RR resolution failed. 559 */ 560 protected List<HostAddress> populateHostAddresses() { 561 List<HostAddress> failedAddresses = new LinkedList<>(); 562 // N.B.: Important to use config.serviceName and not AbstractXMPPConnection.serviceName 563 if (config.host != null) { 564 hostAddresses = new ArrayList<HostAddress>(1); 565 HostAddress hostAddress; 566 hostAddress = new HostAddress(config.host, config.port); 567 hostAddresses.add(hostAddress); 568 } else { 569 hostAddresses = DNSUtil.resolveXMPPDomain(config.serviceName, failedAddresses); 570 } 571 // If we reach this, then hostAddresses *must not* be empty, i.e. there is at least one host added, either the 572 // config.host one or the host representing the service name by DNSUtil 573 assert(!hostAddresses.isEmpty()); 574 return failedAddresses; 575 } 576 577 protected Lock getConnectionLock() { 578 return connectionLock; 579 } 580 581 protected void throwNotConnectedExceptionIfAppropriate() throws NotConnectedException { 582 if (!isConnected()) { 583 throw new NotConnectedException(); 584 } 585 } 586 587 protected void throwAlreadyConnectedExceptionIfAppropriate() throws AlreadyConnectedException { 588 if (isConnected()) { 589 throw new AlreadyConnectedException(); 590 } 591 } 592 593 protected void throwAlreadyLoggedInExceptionIfAppropriate() throws AlreadyLoggedInException { 594 if (isAuthenticated()) { 595 throw new AlreadyLoggedInException(); 596 } 597 } 598 599 @Deprecated 600 @Override 601 public void sendPacket(Stanza packet) throws NotConnectedException { 602 sendStanza(packet); 603 } 604 605 @Override 606 public void sendStanza(Stanza packet) throws NotConnectedException { 607 Objects.requireNonNull(packet, "Packet must not be null"); 608 609 throwNotConnectedExceptionIfAppropriate(); 610 switch (fromMode) { 611 case OMITTED: 612 packet.setFrom(null); 613 break; 614 case USER: 615 packet.setFrom(getUser()); 616 break; 617 case UNCHANGED: 618 default: 619 break; 620 } 621 // Invoke interceptors for the new packet that is about to be sent. Interceptors may modify 622 // the content of the packet. 623 firePacketInterceptors(packet); 624 sendStanzaInternal(packet); 625 } 626 627 /** 628 * Returns the SASLAuthentication manager that is responsible for authenticating with 629 * the server. 630 * 631 * @return the SASLAuthentication manager that is responsible for authenticating with 632 * the server. 633 */ 634 protected SASLAuthentication getSASLAuthentication() { 635 return saslAuthentication; 636 } 637 638 /** 639 * Closes the connection by setting presence to unavailable then closing the connection to 640 * the XMPP server. The XMPPConnection can still be used for connecting to the server 641 * again. 642 * 643 */ 644 public void disconnect() { 645 try { 646 disconnect(new Presence(Presence.Type.unavailable)); 647 } 648 catch (NotConnectedException e) { 649 LOGGER.log(Level.FINEST, "Connection is already disconnected", e); 650 } 651 } 652 653 /** 654 * Closes the connection. A custom unavailable presence is sent to the server, followed 655 * by closing the stream. The XMPPConnection can still be used for connecting to the server 656 * again. A custom unavailable presence is useful for communicating offline presence 657 * information such as "On vacation". Typically, just the status text of the presence 658 * stanza(/packet) is set with online information, but most XMPP servers will deliver the full 659 * presence stanza(/packet) with whatever data is set. 660 * 661 * @param unavailablePresence the presence stanza(/packet) to send during shutdown. 662 * @throws NotConnectedException 663 */ 664 public synchronized void disconnect(Presence unavailablePresence) throws NotConnectedException { 665 sendStanza(unavailablePresence); 666 shutdown(); 667 callConnectionClosedListener(); 668 } 669 670 /** 671 * Shuts the current connection down. 672 */ 673 protected abstract void shutdown(); 674 675 @Override 676 public void addConnectionListener(ConnectionListener connectionListener) { 677 if (connectionListener == null) { 678 return; 679 } 680 connectionListeners.add(connectionListener); 681 } 682 683 @Override 684 public void removeConnectionListener(ConnectionListener connectionListener) { 685 connectionListeners.remove(connectionListener); 686 } 687 688 @Override 689 public PacketCollector createPacketCollectorAndSend(IQ packet) throws NotConnectedException { 690 StanzaFilter packetFilter = new IQReplyFilter(packet, this); 691 // Create the packet collector before sending the packet 692 PacketCollector packetCollector = createPacketCollectorAndSend(packetFilter, packet); 693 return packetCollector; 694 } 695 696 @Override 697 public PacketCollector createPacketCollectorAndSend(StanzaFilter packetFilter, Stanza packet) 698 throws NotConnectedException { 699 // Create the packet collector before sending the packet 700 PacketCollector packetCollector = createPacketCollector(packetFilter); 701 try { 702 // Now we can send the packet as the collector has been created 703 sendStanza(packet); 704 } 705 catch (NotConnectedException | RuntimeException e) { 706 packetCollector.cancel(); 707 throw e; 708 } 709 return packetCollector; 710 } 711 712 @Override 713 public PacketCollector createPacketCollector(StanzaFilter packetFilter) { 714 PacketCollector.Configuration configuration = PacketCollector.newConfiguration().setStanzaFilter(packetFilter); 715 return createPacketCollector(configuration); 716 } 717 718 @Override 719 public PacketCollector createPacketCollector(PacketCollector.Configuration configuration) { 720 PacketCollector collector = new PacketCollector(this, configuration); 721 // Add the collector to the list of active collectors. 722 collectors.add(collector); 723 return collector; 724 } 725 726 @Override 727 public void removePacketCollector(PacketCollector collector) { 728 collectors.remove(collector); 729 } 730 731 @Override 732 @Deprecated 733 public void addPacketListener(StanzaListener packetListener, StanzaFilter packetFilter) { 734 addAsyncStanzaListener(packetListener, packetFilter); 735 } 736 737 @Override 738 @Deprecated 739 public boolean removePacketListener(StanzaListener packetListener) { 740 return removeAsyncStanzaListener(packetListener); 741 } 742 743 @Override 744 public void addSyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 745 if (packetListener == null) { 746 throw new NullPointerException("Packet listener is null."); 747 } 748 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 749 synchronized (syncRecvListeners) { 750 syncRecvListeners.put(packetListener, wrapper); 751 } 752 } 753 754 @Override 755 public boolean removeSyncStanzaListener(StanzaListener packetListener) { 756 synchronized (syncRecvListeners) { 757 return syncRecvListeners.remove(packetListener) != null; 758 } 759 } 760 761 @Override 762 public void addAsyncStanzaListener(StanzaListener packetListener, StanzaFilter packetFilter) { 763 if (packetListener == null) { 764 throw new NullPointerException("Packet listener is null."); 765 } 766 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 767 synchronized (asyncRecvListeners) { 768 asyncRecvListeners.put(packetListener, wrapper); 769 } 770 } 771 772 @Override 773 public boolean removeAsyncStanzaListener(StanzaListener packetListener) { 774 synchronized (asyncRecvListeners) { 775 return asyncRecvListeners.remove(packetListener) != null; 776 } 777 } 778 779 @Override 780 public void addPacketSendingListener(StanzaListener packetListener, StanzaFilter packetFilter) { 781 if (packetListener == null) { 782 throw new NullPointerException("Packet listener is null."); 783 } 784 ListenerWrapper wrapper = new ListenerWrapper(packetListener, packetFilter); 785 synchronized (sendListeners) { 786 sendListeners.put(packetListener, wrapper); 787 } 788 } 789 790 @Override 791 public void removePacketSendingListener(StanzaListener packetListener) { 792 synchronized (sendListeners) { 793 sendListeners.remove(packetListener); 794 } 795 } 796 797 /** 798 * Process all stanza(/packet) listeners for sending packets. 799 * <p> 800 * Compared to {@link #firePacketInterceptors(Stanza)}, the listeners will be invoked in a new thread. 801 * </p> 802 * 803 * @param packet the stanza(/packet) to process. 804 */ 805 @SuppressWarnings("javadoc") 806 protected void firePacketSendingListeners(final Stanza packet) { 807 final List<StanzaListener> listenersToNotify = new LinkedList<StanzaListener>(); 808 synchronized (sendListeners) { 809 for (ListenerWrapper listenerWrapper : sendListeners.values()) { 810 if (listenerWrapper.filterMatches(packet)) { 811 listenersToNotify.add(listenerWrapper.getListener()); 812 } 813 } 814 } 815 if (listenersToNotify.isEmpty()) { 816 return; 817 } 818 // Notify in a new thread, because we can 819 asyncGo(new Runnable() { 820 @Override 821 public void run() { 822 for (StanzaListener listener : listenersToNotify) { 823 try { 824 listener.processPacket(packet); 825 } 826 catch (Exception e) { 827 LOGGER.log(Level.WARNING, "Sending listener threw exception", e); 828 continue; 829 } 830 } 831 }}); 832 } 833 834 @Override 835 public void addPacketInterceptor(StanzaListener packetInterceptor, 836 StanzaFilter packetFilter) { 837 if (packetInterceptor == null) { 838 throw new NullPointerException("Packet interceptor is null."); 839 } 840 InterceptorWrapper interceptorWrapper = new InterceptorWrapper(packetInterceptor, packetFilter); 841 synchronized (interceptors) { 842 interceptors.put(packetInterceptor, interceptorWrapper); 843 } 844 } 845 846 @Override 847 public void removePacketInterceptor(StanzaListener packetInterceptor) { 848 synchronized (interceptors) { 849 interceptors.remove(packetInterceptor); 850 } 851 } 852 853 /** 854 * Process interceptors. Interceptors may modify the stanza(/packet) that is about to be sent. 855 * Since the thread that requested to send the stanza(/packet) will invoke all interceptors, it 856 * is important that interceptors perform their work as soon as possible so that the 857 * thread does not remain blocked for a long period. 858 * 859 * @param packet the stanza(/packet) that is going to be sent to the server 860 */ 861 private void firePacketInterceptors(Stanza packet) { 862 List<StanzaListener> interceptorsToInvoke = new LinkedList<StanzaListener>(); 863 synchronized (interceptors) { 864 for (InterceptorWrapper interceptorWrapper : interceptors.values()) { 865 if (interceptorWrapper.filterMatches(packet)) { 866 interceptorsToInvoke.add(interceptorWrapper.getInterceptor()); 867 } 868 } 869 } 870 for (StanzaListener interceptor : interceptorsToInvoke) { 871 try { 872 interceptor.processPacket(packet); 873 } catch (Exception e) { 874 LOGGER.log(Level.SEVERE, "Packet interceptor threw exception", e); 875 } 876 } 877 } 878 879 /** 880 * Initialize the {@link #debugger}. You can specify a customized {@link SmackDebugger} 881 * by setup the system property <code>smack.debuggerClass</code> to the implementation. 882 * 883 * @throws IllegalStateException if the reader or writer isn't yet initialized. 884 * @throws IllegalArgumentException if the SmackDebugger can't be loaded. 885 */ 886 protected void initDebugger() { 887 if (reader == null || writer == null) { 888 throw new NullPointerException("Reader or writer isn't initialized."); 889 } 890 // If debugging is enabled, we open a window and write out all network traffic. 891 if (config.isDebuggerEnabled()) { 892 if (debugger == null) { 893 debugger = SmackConfiguration.createDebugger(this, writer, reader); 894 } 895 896 if (debugger == null) { 897 LOGGER.severe("Debugging enabled but could not find debugger class"); 898 } else { 899 // Obtain new reader and writer from the existing debugger 900 reader = debugger.newConnectionReader(reader); 901 writer = debugger.newConnectionWriter(writer); 902 } 903 } 904 } 905 906 @Override 907 public long getPacketReplyTimeout() { 908 return packetReplyTimeout; 909 } 910 911 @Override 912 public void setPacketReplyTimeout(long timeout) { 913 packetReplyTimeout = timeout; 914 } 915 916 private static boolean replyToUnknownIqDefault = true; 917 918 /** 919 * Set the default value used to determine if new connection will reply to unknown IQ requests. The pre-configured 920 * default is 'true'. 921 * 922 * @param replyToUnkownIqDefault 923 * @see #setReplyToUnknownIq(boolean) 924 */ 925 public static void setReplyToUnknownIqDefault(boolean replyToUnkownIqDefault) { 926 AbstractXMPPConnection.replyToUnknownIqDefault = replyToUnkownIqDefault; 927 } 928 929 private boolean replyToUnkownIq = replyToUnknownIqDefault; 930 931 /** 932 * Set if Smack will automatically send 933 * {@link org.jivesoftware.smack.packet.XMPPError.Condition#feature_not_implemented} when a request IQ without a 934 * registered {@link IQRequestHandler} is received. 935 * 936 * @param replyToUnknownIq 937 */ 938 public void setReplyToUnknownIq(boolean replyToUnknownIq) { 939 this.replyToUnkownIq = replyToUnknownIq; 940 } 941 942 protected void parseAndProcessStanza(XmlPullParser parser) throws Exception { 943 ParserUtils.assertAtStartTag(parser); 944 int parserDepth = parser.getDepth(); 945 Stanza stanza = null; 946 try { 947 stanza = PacketParserUtils.parseStanza(parser); 948 } 949 catch (Exception e) { 950 CharSequence content = PacketParserUtils.parseContentDepth(parser, 951 parserDepth); 952 UnparsablePacket message = new UnparsablePacket(content, e); 953 ParsingExceptionCallback callback = getParsingExceptionCallback(); 954 if (callback != null) { 955 callback.handleUnparsablePacket(message); 956 } 957 } 958 ParserUtils.assertAtEndTag(parser); 959 if (stanza != null) { 960 processPacket(stanza); 961 } 962 } 963 964 /** 965 * Processes a stanza(/packet) after it's been fully parsed by looping through the installed 966 * stanza(/packet) collectors and listeners and letting them examine the stanza(/packet) to see if 967 * they are a match with the filter. 968 * 969 * @param packet the stanza(/packet) to process. 970 */ 971 protected void processPacket(Stanza packet) { 972 assert(packet != null); 973 lastStanzaReceived = System.currentTimeMillis(); 974 // Deliver the incoming packet to listeners. 975 executorService.submit(new ListenerNotification(packet)); 976 } 977 978 /** 979 * A runnable to notify all listeners and stanza(/packet) collectors of a packet. 980 */ 981 private class ListenerNotification implements Runnable { 982 983 private final Stanza packet; 984 985 public ListenerNotification(Stanza packet) { 986 this.packet = packet; 987 } 988 989 public void run() { 990 invokePacketCollectorsAndNotifyRecvListeners(packet); 991 } 992 } 993 994 /** 995 * Invoke {@link PacketCollector#processPacket(Stanza)} for every 996 * PacketCollector with the given packet. Also notify the receive listeners with a matching stanza(/packet) filter about the packet. 997 * 998 * @param packet the stanza(/packet) to notify the PacketCollectors and receive listeners about. 999 */ 1000 protected void invokePacketCollectorsAndNotifyRecvListeners(final Stanza packet) { 1001 if (packet instanceof IQ) { 1002 final IQ iq = (IQ) packet; 1003 final IQ.Type type = iq.getType(); 1004 switch (type) { 1005 case set: 1006 case get: 1007 final String key = XmppStringUtils.generateKey(iq.getChildElementName(), iq.getChildElementNamespace()); 1008 IQRequestHandler iqRequestHandler = null; 1009 switch (type) { 1010 case set: 1011 synchronized (setIqRequestHandler) { 1012 iqRequestHandler = setIqRequestHandler.get(key); 1013 } 1014 break; 1015 case get: 1016 synchronized (getIqRequestHandler) { 1017 iqRequestHandler = getIqRequestHandler.get(key); 1018 } 1019 break; 1020 default: 1021 throw new IllegalStateException("Should only encounter IQ type 'get' or 'set'"); 1022 } 1023 if (iqRequestHandler == null) { 1024 if (!replyToUnkownIq) { 1025 return; 1026 } 1027 // If the IQ stanza is of type "get" or "set" with no registered IQ request handler, then answer an 1028 // IQ of type "error" with code 501 ("feature-not-implemented") 1029 ErrorIQ errorIQ = IQ.createErrorResponse(iq, new XMPPError( 1030 XMPPError.Condition.feature_not_implemented)); 1031 try { 1032 sendStanza(errorIQ); 1033 } 1034 catch (NotConnectedException e) { 1035 LOGGER.log(Level.WARNING, "NotConnectedException while sending error IQ to unkown IQ request", e); 1036 } 1037 } else { 1038 ExecutorService executorService = null; 1039 switch (iqRequestHandler.getMode()) { 1040 case sync: 1041 executorService = singleThreadedExecutorService; 1042 break; 1043 case async: 1044 executorService = cachedExecutorService; 1045 break; 1046 } 1047 final IQRequestHandler finalIqRequestHandler = iqRequestHandler; 1048 executorService.execute(new Runnable() { 1049 @Override 1050 public void run() { 1051 IQ response = finalIqRequestHandler.handleIQRequest(iq); 1052 if (response == null) { 1053 // It is not ideal if the IQ request handler does not return an IQ response, because RFC 1054 // 6120 § 8.1.2 does specify that a response is mandatory. But some APIs, mostly the 1055 // file transfer one, does not always return a result, so we need to handle this case. 1056 // Also sometimes a request handler may decide that it's better to not send a response, 1057 // e.g. to avoid presence leaks. 1058 return; 1059 } 1060 try { 1061 sendStanza(response); 1062 } 1063 catch (NotConnectedException e) { 1064 LOGGER.log(Level.WARNING, "NotConnectedException while sending response to IQ request", e); 1065 } 1066 } 1067 }); 1068 // The following returns makes it impossible for packet listeners and collectors to 1069 // filter for IQ request stanzas, i.e. IQs of type 'set' or 'get'. This is the 1070 // desired behavior. 1071 return; 1072 } 1073 break; 1074 default: 1075 break; 1076 } 1077 } 1078 1079 // First handle the async recv listeners. Note that this code is very similar to what follows a few lines below, 1080 // the only difference is that asyncRecvListeners is used here and that the packet listeners are started in 1081 // their own thread. 1082 final Collection<StanzaListener> listenersToNotify = new LinkedList<StanzaListener>(); 1083 synchronized (asyncRecvListeners) { 1084 for (ListenerWrapper listenerWrapper : asyncRecvListeners.values()) { 1085 if (listenerWrapper.filterMatches(packet)) { 1086 listenersToNotify.add(listenerWrapper.getListener()); 1087 } 1088 } 1089 } 1090 1091 for (final StanzaListener listener : listenersToNotify) { 1092 asyncGo(new Runnable() { 1093 @Override 1094 public void run() { 1095 try { 1096 listener.processPacket(packet); 1097 } catch (Exception e) { 1098 LOGGER.log(Level.SEVERE, "Exception in async packet listener", e); 1099 } 1100 } 1101 }); 1102 } 1103 1104 // Loop through all collectors and notify the appropriate ones. 1105 for (PacketCollector collector: collectors) { 1106 collector.processPacket(packet); 1107 } 1108 1109 // Notify the receive listeners interested in the packet 1110 listenersToNotify.clear(); 1111 synchronized (syncRecvListeners) { 1112 for (ListenerWrapper listenerWrapper : syncRecvListeners.values()) { 1113 if (listenerWrapper.filterMatches(packet)) { 1114 listenersToNotify.add(listenerWrapper.getListener()); 1115 } 1116 } 1117 } 1118 1119 // Decouple incoming stanza processing from listener invocation. Unlike async listeners, this uses a single 1120 // threaded executor service and therefore keeps the order. 1121 singleThreadedExecutorService.execute(new Runnable() { 1122 @Override 1123 public void run() { 1124 for (StanzaListener listener : listenersToNotify) { 1125 try { 1126 listener.processPacket(packet); 1127 } catch(NotConnectedException e) { 1128 LOGGER.log(Level.WARNING, "Got not connected exception, aborting", e); 1129 break; 1130 } catch (Exception e) { 1131 LOGGER.log(Level.SEVERE, "Exception in packet listener", e); 1132 } 1133 } 1134 } 1135 }); 1136 1137 } 1138 1139 /** 1140 * Sets whether the connection has already logged in the server. This method assures that the 1141 * {@link #wasAuthenticated} flag is never reset once it has ever been set. 1142 * 1143 */ 1144 protected void setWasAuthenticated() { 1145 // Never reset the flag if the connection has ever been authenticated 1146 if (!wasAuthenticated) { 1147 wasAuthenticated = authenticated; 1148 } 1149 } 1150 1151 protected void callConnectionConnectedListener() { 1152 for (ConnectionListener listener : connectionListeners) { 1153 listener.connected(this); 1154 } 1155 } 1156 1157 protected void callConnectionAuthenticatedListener(boolean resumed) { 1158 for (ConnectionListener listener : connectionListeners) { 1159 try { 1160 listener.authenticated(this, resumed); 1161 } catch (Exception e) { 1162 // Catch and print any exception so we can recover 1163 // from a faulty listener and finish the shutdown process 1164 LOGGER.log(Level.SEVERE, "Exception in authenticated listener", e); 1165 } 1166 } 1167 } 1168 1169 void callConnectionClosedListener() { 1170 for (ConnectionListener listener : connectionListeners) { 1171 try { 1172 listener.connectionClosed(); 1173 } 1174 catch (Exception e) { 1175 // Catch and print any exception so we can recover 1176 // from a faulty listener and finish the shutdown process 1177 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e); 1178 } 1179 } 1180 } 1181 1182 protected void callConnectionClosedOnErrorListener(Exception e) { 1183 LOGGER.log(Level.WARNING, "Connection closed with error", e); 1184 for (ConnectionListener listener : connectionListeners) { 1185 try { 1186 listener.connectionClosedOnError(e); 1187 } 1188 catch (Exception e2) { 1189 // Catch and print any exception so we can recover 1190 // from a faulty listener 1191 LOGGER.log(Level.SEVERE, "Error in listener while closing connection", e2); 1192 } 1193 } 1194 } 1195 1196 /** 1197 * Sends a notification indicating that the connection was reconnected successfully. 1198 */ 1199 protected void notifyReconnection() { 1200 // Notify connection listeners of the reconnection. 1201 for (ConnectionListener listener : connectionListeners) { 1202 try { 1203 listener.reconnectionSuccessful(); 1204 } 1205 catch (Exception e) { 1206 // Catch and print any exception so we can recover 1207 // from a faulty listener 1208 LOGGER.log(Level.WARNING, "notifyReconnection()", e); 1209 } 1210 } 1211 } 1212 1213 /** 1214 * A wrapper class to associate a stanza(/packet) filter with a listener. 1215 */ 1216 protected static class ListenerWrapper { 1217 1218 private final StanzaListener packetListener; 1219 private final StanzaFilter packetFilter; 1220 1221 /** 1222 * Create a class which associates a stanza(/packet) filter with a listener. 1223 * 1224 * @param packetListener the stanza(/packet) listener. 1225 * @param packetFilter the associated filter or null if it listen for all packets. 1226 */ 1227 public ListenerWrapper(StanzaListener packetListener, StanzaFilter packetFilter) { 1228 this.packetListener = packetListener; 1229 this.packetFilter = packetFilter; 1230 } 1231 1232 public boolean filterMatches(Stanza packet) { 1233 return packetFilter == null || packetFilter.accept(packet); 1234 } 1235 1236 public StanzaListener getListener() { 1237 return packetListener; 1238 } 1239 } 1240 1241 /** 1242 * A wrapper class to associate a stanza(/packet) filter with an interceptor. 1243 */ 1244 protected static class InterceptorWrapper { 1245 1246 private final StanzaListener packetInterceptor; 1247 private final StanzaFilter packetFilter; 1248 1249 /** 1250 * Create a class which associates a stanza(/packet) filter with an interceptor. 1251 * 1252 * @param packetInterceptor the interceptor. 1253 * @param packetFilter the associated filter or null if it intercepts all packets. 1254 */ 1255 public InterceptorWrapper(StanzaListener packetInterceptor, StanzaFilter packetFilter) { 1256 this.packetInterceptor = packetInterceptor; 1257 this.packetFilter = packetFilter; 1258 } 1259 1260 public boolean filterMatches(Stanza packet) { 1261 return packetFilter == null || packetFilter.accept(packet); 1262 } 1263 1264 public StanzaListener getInterceptor() { 1265 return packetInterceptor; 1266 } 1267 } 1268 1269 @Override 1270 public int getConnectionCounter() { 1271 return connectionCounterValue; 1272 } 1273 1274 @Override 1275 public void setFromMode(FromMode fromMode) { 1276 this.fromMode = fromMode; 1277 } 1278 1279 @Override 1280 public FromMode getFromMode() { 1281 return this.fromMode; 1282 } 1283 1284 @Override 1285 protected void finalize() throws Throwable { 1286 LOGGER.fine("finalizing XMPPConnection ( " + getConnectionCounter() 1287 + "): Shutting down executor services"); 1288 try { 1289 // It's usually not a good idea to rely on finalize. But this is the easiest way to 1290 // avoid the "Smack Listener Processor" leaking. The thread(s) of the executor have a 1291 // reference to their ExecutorService which prevents the ExecutorService from being 1292 // gc'ed. It is possible that the XMPPConnection instance is gc'ed while the 1293 // listenerExecutor ExecutorService call not be gc'ed until it got shut down. 1294 executorService.shutdownNow(); 1295 cachedExecutorService.shutdown(); 1296 removeCallbacksService.shutdownNow(); 1297 singleThreadedExecutorService.shutdownNow(); 1298 } catch (Throwable t) { 1299 LOGGER.log(Level.WARNING, "finalize() threw trhowable", t); 1300 } 1301 finally { 1302 super.finalize(); 1303 } 1304 } 1305 1306 protected final void parseFeatures(XmlPullParser parser) throws XmlPullParserException, 1307 IOException, SmackException { 1308 streamFeatures.clear(); 1309 final int initialDepth = parser.getDepth(); 1310 while (true) { 1311 int eventType = parser.next(); 1312 1313 if (eventType == XmlPullParser.START_TAG && parser.getDepth() == initialDepth + 1) { 1314 ExtensionElement streamFeature = null; 1315 String name = parser.getName(); 1316 String namespace = parser.getNamespace(); 1317 switch (name) { 1318 case StartTls.ELEMENT: 1319 streamFeature = PacketParserUtils.parseStartTlsFeature(parser); 1320 break; 1321 case Mechanisms.ELEMENT: 1322 streamFeature = new Mechanisms(PacketParserUtils.parseMechanisms(parser)); 1323 break; 1324 case Bind.ELEMENT: 1325 streamFeature = Bind.Feature.INSTANCE; 1326 break; 1327 case Session.ELEMENT: 1328 streamFeature = PacketParserUtils.parseSessionFeature(parser); 1329 break; 1330 case Compress.Feature.ELEMENT: 1331 streamFeature = PacketParserUtils.parseCompressionFeature(parser); 1332 break; 1333 default: 1334 ExtensionElementProvider<ExtensionElement> provider = ProviderManager.getStreamFeatureProvider(name, namespace); 1335 if (provider != null) { 1336 streamFeature = provider.parse(parser); 1337 } 1338 break; 1339 } 1340 if (streamFeature != null) { 1341 addStreamFeature(streamFeature); 1342 } 1343 } 1344 else if (eventType == XmlPullParser.END_TAG && parser.getDepth() == initialDepth) { 1345 break; 1346 } 1347 } 1348 1349 if (hasFeature(Mechanisms.ELEMENT, Mechanisms.NAMESPACE)) { 1350 // Only proceed with SASL auth if TLS is disabled or if the server doesn't announce it 1351 if (!hasFeature(StartTls.ELEMENT, StartTls.NAMESPACE) 1352 || config.getSecurityMode() == SecurityMode.disabled) { 1353 saslFeatureReceived.reportSuccess(); 1354 } 1355 } 1356 1357 // If the server reported the bind feature then we are that that we did SASL and maybe 1358 // STARTTLS. We can then report that the last 'stream:features' have been parsed 1359 if (hasFeature(Bind.ELEMENT, Bind.NAMESPACE)) { 1360 if (!hasFeature(Compress.Feature.ELEMENT, Compress.NAMESPACE) 1361 || !config.isCompressionEnabled()) { 1362 // This was was last features from the server is either it did not contain 1363 // compression or if we disabled it 1364 lastFeaturesReceived.reportSuccess(); 1365 } 1366 } 1367 afterFeaturesReceived(); 1368 } 1369 1370 protected void afterFeaturesReceived() throws SecurityRequiredException, NotConnectedException { 1371 // Default implementation does nothing 1372 } 1373 1374 @SuppressWarnings("unchecked") 1375 @Override 1376 public <F extends ExtensionElement> F getFeature(String element, String namespace) { 1377 return (F) streamFeatures.get(XmppStringUtils.generateKey(element, namespace)); 1378 } 1379 1380 @Override 1381 public boolean hasFeature(String element, String namespace) { 1382 return getFeature(element, namespace) != null; 1383 } 1384 1385 private void addStreamFeature(ExtensionElement feature) { 1386 String key = XmppStringUtils.generateKey(feature.getElementName(), feature.getNamespace()); 1387 streamFeatures.put(key, feature); 1388 } 1389 1390 @Override 1391 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1392 StanzaListener callback) throws NotConnectedException { 1393 sendStanzaWithResponseCallback(stanza, replyFilter, callback, null); 1394 } 1395 1396 @Override 1397 public void sendStanzaWithResponseCallback(Stanza stanza, StanzaFilter replyFilter, 1398 StanzaListener callback, ExceptionCallback exceptionCallback) 1399 throws NotConnectedException { 1400 sendStanzaWithResponseCallback(stanza, replyFilter, callback, exceptionCallback, 1401 getPacketReplyTimeout()); 1402 } 1403 1404 @Override 1405 public void sendStanzaWithResponseCallback(Stanza stanza, final StanzaFilter replyFilter, 1406 final StanzaListener callback, final ExceptionCallback exceptionCallback, 1407 long timeout) throws NotConnectedException { 1408 Objects.requireNonNull(stanza, "stanza must not be null"); 1409 // While Smack allows to add PacketListeners with a PacketFilter value of 'null', we 1410 // disallow it here in the async API as it makes no sense 1411 Objects.requireNonNull(replyFilter, "replyFilter must not be null"); 1412 Objects.requireNonNull(callback, "callback must not be null"); 1413 1414 final StanzaListener packetListener = new StanzaListener() { 1415 @Override 1416 public void processPacket(Stanza packet) throws NotConnectedException { 1417 try { 1418 XMPPErrorException.ifHasErrorThenThrow(packet); 1419 callback.processPacket(packet); 1420 } 1421 catch (XMPPErrorException e) { 1422 if (exceptionCallback != null) { 1423 exceptionCallback.processException(e); 1424 } 1425 } 1426 finally { 1427 removeAsyncStanzaListener(this); 1428 } 1429 } 1430 }; 1431 removeCallbacksService.schedule(new Runnable() { 1432 @Override 1433 public void run() { 1434 boolean removed = removeAsyncStanzaListener(packetListener); 1435 // If the packetListener got removed, then it was never run and 1436 // we never received a response, inform the exception callback 1437 if (removed && exceptionCallback != null) { 1438 exceptionCallback.processException(NoResponseException.newWith(AbstractXMPPConnection.this, replyFilter)); 1439 } 1440 } 1441 }, timeout, TimeUnit.MILLISECONDS); 1442 addAsyncStanzaListener(packetListener, replyFilter); 1443 sendStanza(stanza); 1444 } 1445 1446 @Override 1447 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback) 1448 throws NotConnectedException { 1449 sendIqWithResponseCallback(iqRequest, callback, null); 1450 } 1451 1452 @Override 1453 public void sendIqWithResponseCallback(IQ iqRequest, StanzaListener callback, 1454 ExceptionCallback exceptionCallback) throws NotConnectedException { 1455 sendIqWithResponseCallback(iqRequest, callback, exceptionCallback, getPacketReplyTimeout()); 1456 } 1457 1458 @Override 1459 public void sendIqWithResponseCallback(IQ iqRequest, final StanzaListener callback, 1460 final ExceptionCallback exceptionCallback, long timeout) 1461 throws NotConnectedException { 1462 StanzaFilter replyFilter = new IQReplyFilter(iqRequest, this); 1463 sendStanzaWithResponseCallback(iqRequest, replyFilter, callback, exceptionCallback, timeout); 1464 } 1465 1466 @Override 1467 public void addOneTimeSyncCallback(final StanzaListener callback, final StanzaFilter packetFilter) { 1468 final StanzaListener packetListener = new StanzaListener() { 1469 @Override 1470 public void processPacket(Stanza packet) throws NotConnectedException { 1471 try { 1472 callback.processPacket(packet); 1473 } finally { 1474 removeSyncStanzaListener(this); 1475 } 1476 } 1477 }; 1478 addSyncStanzaListener(packetListener, packetFilter); 1479 removeCallbacksService.schedule(new Runnable() { 1480 @Override 1481 public void run() { 1482 removeSyncStanzaListener(packetListener); 1483 } 1484 }, getPacketReplyTimeout(), TimeUnit.MILLISECONDS); 1485 } 1486 1487 @Override 1488 public IQRequestHandler registerIQRequestHandler(final IQRequestHandler iqRequestHandler) { 1489 final String key = XmppStringUtils.generateKey(iqRequestHandler.getElement(), iqRequestHandler.getNamespace()); 1490 switch (iqRequestHandler.getType()) { 1491 case set: 1492 synchronized (setIqRequestHandler) { 1493 return setIqRequestHandler.put(key, iqRequestHandler); 1494 } 1495 case get: 1496 synchronized (getIqRequestHandler) { 1497 return getIqRequestHandler.put(key, iqRequestHandler); 1498 } 1499 default: 1500 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1501 } 1502 } 1503 1504 @Override 1505 public final IQRequestHandler unregisterIQRequestHandler(IQRequestHandler iqRequestHandler) { 1506 return unregisterIQRequestHandler(iqRequestHandler.getElement(), iqRequestHandler.getNamespace(), 1507 iqRequestHandler.getType()); 1508 } 1509 1510 @Override 1511 public IQRequestHandler unregisterIQRequestHandler(String element, String namespace, IQ.Type type) { 1512 final String key = XmppStringUtils.generateKey(element, namespace); 1513 switch (type) { 1514 case set: 1515 synchronized (setIqRequestHandler) { 1516 return setIqRequestHandler.remove(key); 1517 } 1518 case get: 1519 synchronized (getIqRequestHandler) { 1520 return getIqRequestHandler.remove(key); 1521 } 1522 default: 1523 throw new IllegalArgumentException("Only IQ type of 'get' and 'set' allowed"); 1524 } 1525 } 1526 1527 private long lastStanzaReceived; 1528 1529 public long getLastStanzaReceived() { 1530 return lastStanzaReceived; 1531 } 1532 1533 /** 1534 * Install a parsing exception callback, which will be invoked once an exception is encountered while parsing a 1535 * stanza 1536 * 1537 * @param callback the callback to install 1538 */ 1539 public void setParsingExceptionCallback(ParsingExceptionCallback callback) { 1540 parsingExceptionCallback = callback; 1541 } 1542 1543 /** 1544 * Get the current active parsing exception callback. 1545 * 1546 * @return the active exception callback or null if there is none 1547 */ 1548 public ParsingExceptionCallback getParsingExceptionCallback() { 1549 return parsingExceptionCallback; 1550 } 1551 1552 protected final void asyncGo(Runnable runnable) { 1553 cachedExecutorService.execute(runnable); 1554 } 1555 1556 protected final ScheduledFuture<?> schedule(Runnable runnable, long delay, TimeUnit unit) { 1557 return removeCallbacksService.schedule(runnable, delay, unit); 1558 } 1559}