001/** 002 * 003 * Copyright 2009 Jive Software. 004 * 005 * Licensed under the Apache License, Version 2.0 (the "License"); 006 * you may not use this file except in compliance with the License. 007 * You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 018package org.jivesoftware.smack.bosh; 019 020import java.io.IOException; 021import java.io.PipedReader; 022import java.io.PipedWriter; 023import java.io.Writer; 024import java.util.Map; 025import java.util.logging.Level; 026import java.util.logging.Logger; 027 028import org.jivesoftware.smack.AbstractXMPPConnection; 029import org.jivesoftware.smack.SmackException; 030import org.jivesoftware.smack.SmackException.GenericConnectionException; 031import org.jivesoftware.smack.SmackException.NotConnectedException; 032import org.jivesoftware.smack.SmackException.OutgoingQueueFullException; 033import org.jivesoftware.smack.SmackException.SmackWrappedException; 034import org.jivesoftware.smack.XMPPConnection; 035import org.jivesoftware.smack.XMPPException; 036import org.jivesoftware.smack.XMPPException.StreamErrorException; 037import org.jivesoftware.smack.packet.IQ; 038import org.jivesoftware.smack.packet.Message; 039import org.jivesoftware.smack.packet.Presence; 040import org.jivesoftware.smack.packet.Stanza; 041import org.jivesoftware.smack.packet.StanzaError; 042import org.jivesoftware.smack.packet.TopLevelStreamElement; 043import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown; 044import org.jivesoftware.smack.util.Async; 045import org.jivesoftware.smack.util.CloseableUtil; 046import org.jivesoftware.smack.util.PacketParserUtils; 047import org.jivesoftware.smack.xml.XmlPullParser; 048import org.jivesoftware.smack.xml.XmlPullParserException; 049 050import org.igniterealtime.jbosh.AbstractBody; 051import org.igniterealtime.jbosh.BOSHClient; 052import org.igniterealtime.jbosh.BOSHClientConfig; 053import org.igniterealtime.jbosh.BOSHClientConnEvent; 054import org.igniterealtime.jbosh.BOSHClientConnListener; 055import org.igniterealtime.jbosh.BOSHClientRequestListener; 056import org.igniterealtime.jbosh.BOSHClientResponseListener; 057import org.igniterealtime.jbosh.BOSHException; 058import org.igniterealtime.jbosh.BOSHMessageEvent; 059import org.igniterealtime.jbosh.BodyQName; 060import org.igniterealtime.jbosh.ComposableBody; 061import org.jxmpp.jid.DomainBareJid; 062import org.jxmpp.jid.parts.Resourcepart; 063 064/** 065 * Creates a connection to an XMPP server via HTTP binding. 066 * This is specified in the XEP-0206: XMPP Over BOSH. 067 * 068 * @see XMPPConnection 069 * @author Guenther Niess 070 */ 071public class XMPPBOSHConnection extends AbstractXMPPConnection { 072 private static final Logger LOGGER = Logger.getLogger(XMPPBOSHConnection.class.getName()); 073 074 /** 075 * The XMPP Over Bosh namespace. 076 */ 077 public static final String XMPP_BOSH_NS = "urn:xmpp:xbosh"; 078 079 /** 080 * The BOSH namespace from XEP-0124. 081 */ 082 public static final String BOSH_URI = "http://jabber.org/protocol/httpbind"; 083 084 /** 085 * The used BOSH client from the jbosh library. 086 */ 087 private BOSHClient client; 088 089 /** 090 * Holds the initial configuration used while creating the connection. 091 */ 092 @SuppressWarnings("HidingField") 093 private final BOSHConfiguration config; 094 095 private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true); 096 097 private Thread writerThread; 098 099 // Some flags which provides some info about the current state. 100 private boolean isFirstInitialization = true; 101 private boolean done = false; 102 103 // The readerPipe and consumer thread are used for the debugger. 104 private PipedWriter readerPipe; 105 private Thread readerConsumer; 106 107 /** 108 * The session ID for the BOSH session with the connection manager. 109 */ 110 protected String sessionID = null; 111 112 private boolean notified; 113 114 /** 115 * Create a HTTP Binding connection to an XMPP server. 116 * 117 * @param username the username to use. 118 * @param password the password to use. 119 * @param https true if you want to use SSL 120 * (e.g. false for http://domain.lt:7070/http-bind). 121 * @param host the hostname or IP address of the connection manager 122 * (e.g. domain.lt for http://domain.lt:7070/http-bind). 123 * @param port the port of the connection manager 124 * (e.g. 7070 for http://domain.lt:7070/http-bind). 125 * @param filePath the file which is described by the URL 126 * (e.g. /http-bind for http://domain.lt:7070/http-bind). 127 * @param xmppServiceDomain the XMPP service name 128 * (e.g. domain.lt for the user alice@domain.lt) 129 */ 130 public XMPPBOSHConnection(String username, String password, boolean https, String host, int port, String filePath, DomainBareJid xmppServiceDomain) { 131 this(BOSHConfiguration.builder().setUseHttps(https).setHost(host) 132 .setPort(port).setFile(filePath).setXmppDomain(xmppServiceDomain) 133 .setUsernameAndPassword(username, password).build()); 134 } 135 136 /** 137 * Create a HTTP Binding connection to an XMPP server. 138 * 139 * @param config The configuration which is used for this connection. 140 */ 141 public XMPPBOSHConnection(BOSHConfiguration config) { 142 super(config); 143 this.config = config; 144 } 145 146 @SuppressWarnings("deprecation") 147 @Override 148 protected void connectInternal() throws SmackException, InterruptedException { 149 done = false; 150 notified = false; 151 try { 152 // Ensure a clean starting state 153 if (client != null) { 154 client.close(); 155 client = null; 156 } 157 sessionID = null; 158 159 // Initialize BOSH client 160 BOSHClientConfig.Builder cfgBuilder = BOSHClientConfig.Builder 161 .create(config.getURI(), config.getXMPPServiceDomain().toString()); 162 if (config.isProxyEnabled()) { 163 cfgBuilder.setProxy(config.getProxyAddress(), config.getProxyPort()); 164 } 165 166 cfgBuilder.setCompressionEnabled(config.isCompressionEnabled()); 167 168 for (Map.Entry<String, String> h : config.getHttpHeaders().entrySet()) { 169 cfgBuilder.addHttpHeader(h.getKey(), h.getValue()); 170 } 171 172 client = BOSHClient.create(cfgBuilder.build()); 173 174 client.addBOSHClientConnListener(new BOSHConnectionListener()); 175 client.addBOSHClientResponseListener(new BOSHPacketReader()); 176 177 // Initialize the debugger 178 if (debugger != null) { 179 initDebugger(); 180 } 181 182 // Send the session creation request 183 client.send(ComposableBody.builder() 184 .setNamespaceDefinition("xmpp", XMPP_BOSH_NS) 185 .setAttribute(BodyQName.createWithPrefix(XMPP_BOSH_NS, "version", "xmpp"), "1.0") 186 .build()); 187 } catch (Exception e) { 188 throw new GenericConnectionException(e); 189 } 190 191 // Wait for the response from the server 192 synchronized (this) { 193 if (!connected) { 194 final long deadline = System.currentTimeMillis() + getReplyTimeout(); 195 while (!notified) { 196 final long now = System.currentTimeMillis(); 197 if (now >= deadline) break; 198 wait(deadline - now); 199 } 200 } 201 } 202 203 assert writerThread == null || !writerThread.isAlive(); 204 outgoingQueue.start(); 205 writerThread = Async.go(this::writeElements, this + " Writer"); 206 207 // If there is no feedback, throw an remote server timeout error 208 if (!connected && !done) { 209 done = true; 210 String errorMessage = "Timeout reached for the connection to " 211 + getHost() + ":" + getPort() + "."; 212 instantShutdown(); 213 throw new SmackException.SmackMessageException(errorMessage); 214 } 215 216 try { 217 XmlPullParser parser = PacketParserUtils.getParserFor( 218 "<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>"); 219 onStreamOpen(parser); 220 } catch (XmlPullParserException | IOException e) { 221 instantShutdown(); 222 throw new AssertionError("Failed to setup stream environment", e); 223 } 224 } 225 226 @Override 227 public boolean isSecureConnection() { 228 return config.isUsingHTTPS(); 229 } 230 231 @Override 232 public boolean isUsingCompression() { 233 // TODO: Implement compression 234 return false; 235 } 236 237 @Override 238 protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException, 239 SmackException, IOException, InterruptedException { 240 // Authenticate using SASL 241 authenticate(username, password, config.getAuthzid(), null); 242 243 bindResourceAndEstablishSession(resource); 244 245 afterSuccessfulLogin(false); 246 } 247 248 private volatile boolean writerThreadRunning; 249 250 private void writeElements() { 251 writerThreadRunning = true; 252 try { 253 while (true) { 254 TopLevelStreamElement element; 255 try { 256 element = outgoingQueue.take(); 257 } catch (InterruptedException e) { 258 LOGGER.log(Level.FINE, 259 "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception", 260 e); 261 return; 262 } 263 264 String xmlPayload = element.toXML(BOSH_URI).toString(); 265 ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload); 266 if (sessionID != null) { 267 BodyQName qName = BodyQName.create(BOSH_URI, "sid"); 268 composableBodyBuilder.setAttribute(qName, sessionID); 269 } 270 271 ComposableBody composableBody = composableBodyBuilder.build(); 272 273 try { 274 client.send(composableBody); 275 } catch (BOSHException e) { 276 LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e); 277 // TODO: Signal the user that there was an unexpected exception. 278 return; 279 } 280 281 if (element instanceof Stanza) { 282 Stanza stanza = (Stanza) element; 283 firePacketSendingListeners(stanza); 284 } 285 } 286 } catch (Exception exception) { 287 LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception); 288 } finally { 289 writerThreadRunning = false; 290 notifyWaitingThreads(); 291 } 292 } 293 294 @Override 295 protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException { 296 throwNotConnectedExceptionIfAppropriate(); 297 try { 298 outgoingQueue.put(element); 299 } catch (InterruptedException e) { 300 throwNotConnectedExceptionIfAppropriate(); 301 // If the method above did not throw, then the sending thread was interrupted 302 throw e; 303 } 304 } 305 306 @Override 307 protected void sendNonBlockingInternal(TopLevelStreamElement element) 308 throws NotConnectedException, OutgoingQueueFullException { 309 throwNotConnectedExceptionIfAppropriate(); 310 boolean enqueued = outgoingQueue.offer(element); 311 if (!enqueued) { 312 throwNotConnectedExceptionIfAppropriate(); 313 throw new OutgoingQueueFullException(); 314 } 315 } 316 317 @Override 318 protected void shutdown() { 319 instantShutdown(); 320 } 321 322 @Override 323 public void instantShutdown() { 324 outgoingQueue.shutdown(); 325 326 try { 327 boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning); 328 if (!writerThreadTerminated) { 329 LOGGER.severe("Writer thread of " + this + " did not terminate timely"); 330 } 331 } catch (InterruptedException e) { 332 LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e); 333 } 334 335 if (client != null) { 336 try { 337 client.disconnect(); 338 } catch (Exception e) { 339 LOGGER.log(Level.WARNING, "shutdown", e); 340 } 341 } 342 343 setWasAuthenticated(); 344 sessionID = null; 345 done = true; 346 authenticated = false; 347 connected = false; 348 isFirstInitialization = false; 349 client = null; 350 351 // Close down the readers and writers. 352 CloseableUtil.maybeClose(readerPipe, LOGGER); 353 CloseableUtil.maybeClose(reader, LOGGER); 354 CloseableUtil.maybeClose(writer, LOGGER); 355 356 readerPipe = null; 357 reader = null; 358 writer = null; 359 readerConsumer = null; 360 } 361 362 /** 363 * Send a HTTP request to the connection manager with the provided body element. 364 * 365 * @param body the body which will be sent. 366 * @throws BOSHException if an BOSH (Bidirectional-streams Over Synchronous HTTP, XEP-0124) related error occurs 367 */ 368 protected void send(ComposableBody body) throws BOSHException { 369 if (!connected) { 370 throw new IllegalStateException("Not connected to a server!"); 371 } 372 if (body == null) { 373 throw new NullPointerException("Body mustn't be null!"); 374 } 375 if (sessionID != null) { 376 body = body.rebuild().setAttribute( 377 BodyQName.create(BOSH_URI, "sid"), sessionID).build(); 378 } 379 client.send(body); 380 } 381 382 /** 383 * Initialize the SmackDebugger which allows to log and debug XML traffic. 384 */ 385 @Override 386 protected void initDebugger() { 387 // TODO: Maybe we want to extend the SmackDebugger for simplification 388 // and a performance boost. 389 390 // Initialize a empty writer which discards all data. 391 writer = new Writer() { 392 @Override 393 public void write(char[] cbuf, int off, int len) { 394 /* ignore */ } 395 396 @Override 397 public void close() { 398 /* ignore */ } 399 400 @Override 401 public void flush() { 402 /* ignore */ } 403 }; 404 405 // Initialize a pipe for received raw data. 406 try { 407 readerPipe = new PipedWriter(); 408 reader = new PipedReader(readerPipe); 409 } 410 catch (IOException e) { 411 // Ignore 412 } 413 414 // Call the method from the parent class which initializes the debugger. 415 super.initDebugger(); 416 417 // Add listeners for the received and sent raw data. 418 client.addBOSHClientResponseListener(new BOSHClientResponseListener() { 419 @Override 420 public void responseReceived(BOSHMessageEvent event) { 421 if (event.getBody() != null) { 422 try { 423 readerPipe.write(event.getBody().toXML()); 424 readerPipe.flush(); 425 } catch (Exception e) { 426 // Ignore 427 } 428 } 429 } 430 }); 431 client.addBOSHClientRequestListener(new BOSHClientRequestListener() { 432 @Override 433 public void requestSent(BOSHMessageEvent event) { 434 if (event.getBody() != null) { 435 try { 436 writer.write(event.getBody().toXML()); 437 } catch (Exception e) { 438 // Ignore 439 } 440 } 441 } 442 }); 443 444 // Create and start a thread which discards all read data. 445 readerConsumer = new Thread() { 446 private Thread thread = this; 447 private int bufferLength = 1024; 448 449 @Override 450 public void run() { 451 try { 452 char[] cbuf = new char[bufferLength]; 453 while (readerConsumer == thread && !done) { 454 reader.read(cbuf, 0, bufferLength); 455 } 456 } catch (IOException e) { 457 // Ignore 458 } 459 } 460 }; 461 readerConsumer.setDaemon(true); 462 readerConsumer.start(); 463 } 464 465 @Override 466 protected void afterSaslAuthenticationSuccess() 467 throws NotConnectedException, InterruptedException, SmackWrappedException { 468 // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it 469 // requires a special XML element ot be send after successful SASL authentication. 470 // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206. 471 ComposableBody composeableBody = ComposableBody.builder() 472 .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS) 473 .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true") 474 .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString()) 475 .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID) 476 .build(); 477 478 try { 479 client.send(composeableBody); 480 } catch (BOSHException e) { 481 // jbosh's exception API does not really match the one of Smack. 482 throw new SmackException.SmackWrappedException(e); 483 } 484 } 485 486 /** 487 * A listener class which listen for a successfully established connection 488 * and connection errors and notifies the BOSHConnection. 489 * 490 * @author Guenther Niess 491 */ 492 private class BOSHConnectionListener implements BOSHClientConnListener { 493 494 /** 495 * Notify the BOSHConnection about connection state changes. 496 * Process the connection listeners and try to login if the 497 * connection was formerly authenticated and is now reconnected. 498 */ 499 @Override 500 public void connectionEvent(BOSHClientConnEvent connEvent) { 501 try { 502 if (connEvent.isConnected()) { 503 connected = true; 504 if (isFirstInitialization) { 505 isFirstInitialization = false; 506 } 507 else { 508 if (wasAuthenticated) { 509 try { 510 login(); 511 } 512 catch (Exception e) { 513 throw new RuntimeException(e); 514 } 515 } 516 } 517 } 518 else { 519 if (connEvent.isError()) { 520 // TODO Check why jbosh's getCause returns Throwable here. This is very 521 // unusual and should be avoided if possible 522 Throwable cause = connEvent.getCause(); 523 Exception e; 524 if (cause instanceof Exception) { 525 e = (Exception) cause; 526 } else { 527 e = new Exception(cause); 528 } 529 notifyConnectionError(e); 530 } 531 connected = false; 532 } 533 } 534 finally { 535 notified = true; 536 synchronized (XMPPBOSHConnection.this) { 537 XMPPBOSHConnection.this.notifyAll(); 538 } 539 } 540 } 541 } 542 543 /** 544 * Listens for XML traffic from the BOSH connection manager and parses it into 545 * stanza objects. 546 * 547 * @author Guenther Niess 548 */ 549 private class BOSHPacketReader implements BOSHClientResponseListener { 550 551 /** 552 * Parse the received packets and notify the corresponding connection. 553 * 554 * @param event the BOSH client response which includes the received packet. 555 */ 556 @Override 557 public void responseReceived(BOSHMessageEvent event) { 558 AbstractBody body = event.getBody(); 559 if (body != null) { 560 try { 561 if (sessionID == null) { 562 sessionID = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "sid")); 563 } 564 if (streamId == null) { 565 streamId = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "authid")); 566 } 567 final XmlPullParser parser = PacketParserUtils.getParserFor(body.toXML()); 568 569 XmlPullParser.Event eventType = parser.getEventType(); 570 do { 571 eventType = parser.next(); 572 switch (eventType) { 573 case START_ELEMENT: 574 String name = parser.getName(); 575 switch (name) { 576 case Message.ELEMENT: 577 case IQ.IQ_ELEMENT: 578 case Presence.ELEMENT: 579 parseAndProcessStanza(parser); 580 break; 581 case "features": 582 parseFeaturesAndNotify(parser); 583 break; 584 case "error": 585 // Some BOSH error isn't stream error. 586 if ("urn:ietf:params:xml:ns:xmpp-streams".equals(parser.getNamespace(null))) { 587 throw new StreamErrorException(PacketParserUtils.parseStreamError(parser)); 588 } else { 589 StanzaError stanzaError = PacketParserUtils.parseError(parser); 590 throw new XMPPException.XMPPErrorException(null, stanzaError); 591 } 592 default: 593 parseAndProcessNonza(parser); 594 break; 595 } 596 break; 597 default: 598 // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement. 599 break; 600 } 601 } 602 while (eventType != XmlPullParser.Event.END_DOCUMENT); 603 } 604 catch (Exception e) { 605 if (isConnected()) { 606 notifyConnectionError(e); 607 } 608 } 609 } 610 } 611 } 612}