001/**
002 *
003 * Copyright 2009 Jive Software.
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018package org.jivesoftware.smack.bosh;
019
020import java.io.IOException;
021import java.io.PipedReader;
022import java.io.PipedWriter;
023import java.io.Writer;
024import java.util.Map;
025import java.util.logging.Level;
026import java.util.logging.Logger;
027
028import org.jivesoftware.smack.AbstractXMPPConnection;
029import org.jivesoftware.smack.SmackException;
030import org.jivesoftware.smack.SmackException.GenericConnectionException;
031import org.jivesoftware.smack.SmackException.NotConnectedException;
032import org.jivesoftware.smack.SmackException.OutgoingQueueFullException;
033import org.jivesoftware.smack.SmackException.SmackWrappedException;
034import org.jivesoftware.smack.XMPPConnection;
035import org.jivesoftware.smack.XMPPException;
036import org.jivesoftware.smack.XMPPException.StreamErrorException;
037import org.jivesoftware.smack.packet.IQ;
038import org.jivesoftware.smack.packet.Message;
039import org.jivesoftware.smack.packet.Presence;
040import org.jivesoftware.smack.packet.Stanza;
041import org.jivesoftware.smack.packet.StanzaError;
042import org.jivesoftware.smack.packet.TopLevelStreamElement;
043import org.jivesoftware.smack.util.ArrayBlockingQueueWithShutdown;
044import org.jivesoftware.smack.util.Async;
045import org.jivesoftware.smack.util.CloseableUtil;
046import org.jivesoftware.smack.util.PacketParserUtils;
047import org.jivesoftware.smack.xml.XmlPullParser;
048import org.jivesoftware.smack.xml.XmlPullParserException;
049
050import org.igniterealtime.jbosh.AbstractBody;
051import org.igniterealtime.jbosh.BOSHClient;
052import org.igniterealtime.jbosh.BOSHClientConfig;
053import org.igniterealtime.jbosh.BOSHClientConnEvent;
054import org.igniterealtime.jbosh.BOSHClientConnListener;
055import org.igniterealtime.jbosh.BOSHClientRequestListener;
056import org.igniterealtime.jbosh.BOSHClientResponseListener;
057import org.igniterealtime.jbosh.BOSHException;
058import org.igniterealtime.jbosh.BOSHMessageEvent;
059import org.igniterealtime.jbosh.BodyQName;
060import org.igniterealtime.jbosh.ComposableBody;
061import org.jxmpp.jid.DomainBareJid;
062import org.jxmpp.jid.parts.Resourcepart;
063
064/**
065 * Creates a connection to an XMPP server via HTTP binding.
066 * This is specified in the XEP-0206: XMPP Over BOSH.
067 *
068 * @see XMPPConnection
069 * @author Guenther Niess
070 */
071public class XMPPBOSHConnection extends AbstractXMPPConnection {
072    private static final Logger LOGGER = Logger.getLogger(XMPPBOSHConnection.class.getName());
073
074    /**
075     * The XMPP Over Bosh namespace.
076     */
077    public static final String XMPP_BOSH_NS = "urn:xmpp:xbosh";
078
079    /**
080     * The BOSH namespace from XEP-0124.
081     */
082    public static final String BOSH_URI = "http://jabber.org/protocol/httpbind";
083
084    /**
085     * The used BOSH client from the jbosh library.
086     */
087    private BOSHClient client;
088
089    /**
090     * Holds the initial configuration used while creating the connection.
091     */
092    @SuppressWarnings("HidingField")
093    private final BOSHConfiguration config;
094
095    private final ArrayBlockingQueueWithShutdown<TopLevelStreamElement> outgoingQueue = new ArrayBlockingQueueWithShutdown<>(100, true);
096
097    private Thread writerThread;
098
099    // Some flags which provides some info about the current state.
100    private boolean isFirstInitialization = true;
101    private boolean done = false;
102
103    // The readerPipe and consumer thread are used for the debugger.
104    private PipedWriter readerPipe;
105    private Thread readerConsumer;
106
107    /**
108     * The session ID for the BOSH session with the connection manager.
109     */
110    protected String sessionID = null;
111
112    private boolean notified;
113
114    /**
115     * Create a HTTP Binding connection to an XMPP server.
116     *
117     * @param username the username to use.
118     * @param password the password to use.
119     * @param https true if you want to use SSL
120     *             (e.g. false for http://domain.lt:7070/http-bind).
121     * @param host the hostname or IP address of the connection manager
122     *             (e.g. domain.lt for http://domain.lt:7070/http-bind).
123     * @param port the port of the connection manager
124     *             (e.g. 7070 for http://domain.lt:7070/http-bind).
125     * @param filePath the file which is described by the URL
126     *             (e.g. /http-bind for http://domain.lt:7070/http-bind).
127     * @param xmppServiceDomain the XMPP service name
128     *             (e.g. domain.lt for the user alice@domain.lt)
129     */
130    public XMPPBOSHConnection(String username, String password, boolean https, String host, int port, String filePath, DomainBareJid xmppServiceDomain) {
131        this(BOSHConfiguration.builder().setUseHttps(https).setHost(host)
132                .setPort(port).setFile(filePath).setXmppDomain(xmppServiceDomain)
133                .setUsernameAndPassword(username, password).build());
134    }
135
136    /**
137     * Create a HTTP Binding connection to an XMPP server.
138     *
139     * @param config The configuration which is used for this connection.
140     */
141    public XMPPBOSHConnection(BOSHConfiguration config) {
142        super(config);
143        this.config = config;
144    }
145
146    @SuppressWarnings("deprecation")
147    @Override
148    protected void connectInternal() throws SmackException, InterruptedException {
149        done = false;
150        notified = false;
151        try {
152            // Ensure a clean starting state
153            if (client != null) {
154                client.close();
155                client = null;
156            }
157            sessionID = null;
158
159            // Initialize BOSH client
160            BOSHClientConfig.Builder cfgBuilder = BOSHClientConfig.Builder
161                    .create(config.getURI(), config.getXMPPServiceDomain().toString());
162            if (config.isProxyEnabled()) {
163                cfgBuilder.setProxy(config.getProxyAddress(), config.getProxyPort());
164            }
165
166            cfgBuilder.setCompressionEnabled(config.isCompressionEnabled());
167
168            for (Map.Entry<String, String> h : config.getHttpHeaders().entrySet()) {
169                cfgBuilder.addHttpHeader(h.getKey(), h.getValue());
170            }
171
172            client = BOSHClient.create(cfgBuilder.build());
173
174            client.addBOSHClientConnListener(new BOSHConnectionListener());
175            client.addBOSHClientResponseListener(new BOSHPacketReader());
176
177            // Initialize the debugger
178            if (debugger != null) {
179                initDebugger();
180            }
181
182            // Send the session creation request
183            client.send(ComposableBody.builder()
184                    .setNamespaceDefinition("xmpp", XMPP_BOSH_NS)
185                    .setAttribute(BodyQName.createWithPrefix(XMPP_BOSH_NS, "version", "xmpp"), "1.0")
186                    .build());
187        } catch (Exception e) {
188            throw new GenericConnectionException(e);
189        }
190
191        // Wait for the response from the server
192        synchronized (this) {
193            if (!connected) {
194                final long deadline = System.currentTimeMillis() + getReplyTimeout();
195                while (!notified) {
196                    final long now = System.currentTimeMillis();
197                    if (now >= deadline) break;
198                    wait(deadline - now);
199                }
200            }
201        }
202
203        assert writerThread == null || !writerThread.isAlive();
204        outgoingQueue.start();
205        writerThread = Async.go(this::writeElements, this + " Writer");
206
207        // If there is no feedback, throw an remote server timeout error
208        if (!connected && !done) {
209            done = true;
210            String errorMessage = "Timeout reached for the connection to "
211                    + getHost() + ":" + getPort() + ".";
212            instantShutdown();
213            throw new SmackException.SmackMessageException(errorMessage);
214        }
215
216        try {
217            XmlPullParser parser = PacketParserUtils.getParserFor(
218                            "<stream:stream xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'/>");
219            onStreamOpen(parser);
220        } catch (XmlPullParserException | IOException e) {
221            instantShutdown();
222            throw new AssertionError("Failed to setup stream environment", e);
223        }
224    }
225
226    @Override
227    public boolean isSecureConnection() {
228        return config.isUsingHTTPS();
229    }
230
231    @Override
232    public boolean isUsingCompression() {
233        // TODO: Implement compression
234        return false;
235    }
236
237    @Override
238    protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException,
239                    SmackException, IOException, InterruptedException {
240        // Authenticate using SASL
241        authenticate(username, password, config.getAuthzid(), null);
242
243        bindResourceAndEstablishSession(resource);
244
245        afterSuccessfulLogin(false);
246    }
247
248    private volatile boolean writerThreadRunning;
249
250    private void writeElements() {
251        writerThreadRunning = true;
252        try {
253            while (true) {
254                TopLevelStreamElement element;
255                try {
256                    element = outgoingQueue.take();
257                } catch (InterruptedException e) {
258                    LOGGER.log(Level.FINE,
259                                    "Writer thread exiting: Outgoing queue was shutdown as signalled by interrupted exception",
260                                    e);
261                    return;
262                }
263
264                String xmlPayload = element.toXML(BOSH_URI).toString();
265                ComposableBody.Builder composableBodyBuilder = ComposableBody.builder().setPayloadXML(xmlPayload);
266                if (sessionID != null) {
267                    BodyQName qName = BodyQName.create(BOSH_URI, "sid");
268                    composableBodyBuilder.setAttribute(qName, sessionID);
269                }
270
271                ComposableBody composableBody = composableBodyBuilder.build();
272
273                try {
274                    client.send(composableBody);
275                } catch (BOSHException e) {
276                    LOGGER.log(Level.WARNING, this + " received BOSHException in writer thread, connection broke!", e);
277                    // TODO: Signal the user that there was an unexpected exception.
278                    return;
279                }
280
281                if (element instanceof Stanza) {
282                    Stanza stanza = (Stanza) element;
283                    firePacketSendingListeners(stanza);
284                }
285            }
286        } catch (Exception exception) {
287            LOGGER.log(Level.WARNING, "BOSH writer thread threw", exception);
288        } finally {
289            writerThreadRunning = false;
290            notifyWaitingThreads();
291        }
292    }
293
294    @Override
295    protected void sendInternal(TopLevelStreamElement element) throws NotConnectedException, InterruptedException {
296        throwNotConnectedExceptionIfAppropriate();
297        try {
298            outgoingQueue.put(element);
299        } catch (InterruptedException e) {
300            throwNotConnectedExceptionIfAppropriate();
301            // If the method above did not throw, then the sending thread was interrupted
302            throw e;
303        }
304    }
305
306    @Override
307    protected void sendNonBlockingInternal(TopLevelStreamElement element)
308                    throws NotConnectedException, OutgoingQueueFullException {
309        throwNotConnectedExceptionIfAppropriate();
310        boolean enqueued = outgoingQueue.offer(element);
311        if (!enqueued) {
312            throwNotConnectedExceptionIfAppropriate();
313            throw new OutgoingQueueFullException();
314        }
315    }
316
317    @Override
318    protected void shutdown() {
319        instantShutdown();
320    }
321
322    @Override
323    public void instantShutdown() {
324        outgoingQueue.shutdown();
325
326        try {
327            boolean writerThreadTerminated = waitFor(() -> !writerThreadRunning);
328            if (!writerThreadTerminated) {
329                LOGGER.severe("Writer thread of " + this + " did not terminate timely");
330            }
331        } catch (InterruptedException e) {
332            LOGGER.log(Level.FINE, "Interrupted while waiting for writer thread to terminate", e);
333        }
334
335        if (client != null) {
336            try {
337                client.disconnect();
338            } catch (Exception e) {
339                LOGGER.log(Level.WARNING, "shutdown", e);
340            }
341        }
342
343        setWasAuthenticated();
344        sessionID = null;
345        done = true;
346        authenticated = false;
347        connected = false;
348        isFirstInitialization = false;
349        client = null;
350
351        // Close down the readers and writers.
352        CloseableUtil.maybeClose(readerPipe, LOGGER);
353        CloseableUtil.maybeClose(reader, LOGGER);
354        CloseableUtil.maybeClose(writer, LOGGER);
355
356        readerPipe = null;
357        reader = null;
358        writer = null;
359        readerConsumer = null;
360    }
361
362    /**
363     * Send a HTTP request to the connection manager with the provided body element.
364     *
365     * @param body the body which will be sent.
366     * @throws BOSHException if an BOSH (Bidirectional-streams Over Synchronous HTTP, XEP-0124) related error occurs
367     */
368    protected void send(ComposableBody body) throws BOSHException {
369        if (!connected) {
370            throw new IllegalStateException("Not connected to a server!");
371        }
372        if (body == null) {
373            throw new NullPointerException("Body mustn't be null!");
374        }
375        if (sessionID != null) {
376            body = body.rebuild().setAttribute(
377                    BodyQName.create(BOSH_URI, "sid"), sessionID).build();
378        }
379        client.send(body);
380    }
381
382    /**
383     * Initialize the SmackDebugger which allows to log and debug XML traffic.
384     */
385    @Override
386    protected void initDebugger() {
387        // TODO: Maybe we want to extend the SmackDebugger for simplification
388        //       and a performance boost.
389
390        // Initialize a empty writer which discards all data.
391        writer = new Writer() {
392            @Override
393            public void write(char[] cbuf, int off, int len) {
394                /* ignore */ }
395
396            @Override
397            public void close() {
398                /* ignore */ }
399
400            @Override
401            public void flush() {
402                /* ignore */ }
403        };
404
405        // Initialize a pipe for received raw data.
406        try {
407            readerPipe = new PipedWriter();
408            reader = new PipedReader(readerPipe);
409        }
410        catch (IOException e) {
411            // Ignore
412        }
413
414        // Call the method from the parent class which initializes the debugger.
415        super.initDebugger();
416
417        // Add listeners for the received and sent raw data.
418        client.addBOSHClientResponseListener(new BOSHClientResponseListener() {
419            @Override
420            public void responseReceived(BOSHMessageEvent event) {
421                if (event.getBody() != null) {
422                    try {
423                        readerPipe.write(event.getBody().toXML());
424                        readerPipe.flush();
425                    } catch (Exception e) {
426                        // Ignore
427                    }
428                }
429            }
430        });
431        client.addBOSHClientRequestListener(new BOSHClientRequestListener() {
432            @Override
433            public void requestSent(BOSHMessageEvent event) {
434                if (event.getBody() != null) {
435                    try {
436                        writer.write(event.getBody().toXML());
437                    } catch (Exception e) {
438                        // Ignore
439                    }
440                }
441            }
442        });
443
444        // Create and start a thread which discards all read data.
445        readerConsumer = new Thread() {
446            private Thread thread = this;
447            private int bufferLength = 1024;
448
449            @Override
450            public void run() {
451                try {
452                    char[] cbuf = new char[bufferLength];
453                    while (readerConsumer == thread && !done) {
454                        reader.read(cbuf, 0, bufferLength);
455                    }
456                } catch (IOException e) {
457                    // Ignore
458                }
459            }
460        };
461        readerConsumer.setDaemon(true);
462        readerConsumer.start();
463    }
464
465    @Override
466    protected void afterSaslAuthenticationSuccess()
467                    throws NotConnectedException, InterruptedException, SmackWrappedException {
468        // XMPP over BOSH is unusual when it comes to SASL authentication: Instead of sending a new stream open, it
469        // requires a special XML element ot be send after successful SASL authentication.
470        // See XEP-0206 ยง 5., especially the following is example 5 of XEP-0206.
471        ComposableBody composeableBody = ComposableBody.builder()
472                .setNamespaceDefinition("xmpp", XMPPBOSHConnection.XMPP_BOSH_NS)
473                .setAttribute(BodyQName.createWithPrefix(XMPPBOSHConnection.XMPP_BOSH_NS, "restart", "xmpp"), "true")
474                .setAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "to"), getXMPPServiceDomain().toString())
475                .setAttribute(BodyQName.create(BOSH_URI, "sid"), sessionID)
476                .build();
477
478        try {
479            client.send(composeableBody);
480        } catch (BOSHException e) {
481            // jbosh's exception API does not really match the one of Smack.
482            throw new SmackException.SmackWrappedException(e);
483        }
484    }
485
486    /**
487     * A listener class which listen for a successfully established connection
488     * and connection errors and notifies the BOSHConnection.
489     *
490     * @author Guenther Niess
491     */
492    private class BOSHConnectionListener implements BOSHClientConnListener {
493
494        /**
495         * Notify the BOSHConnection about connection state changes.
496         * Process the connection listeners and try to login if the
497         * connection was formerly authenticated and is now reconnected.
498         */
499        @Override
500        public void connectionEvent(BOSHClientConnEvent connEvent) {
501            try {
502                if (connEvent.isConnected()) {
503                    connected = true;
504                    if (isFirstInitialization) {
505                        isFirstInitialization = false;
506                    }
507                    else {
508                            if (wasAuthenticated) {
509                                try {
510                                    login();
511                                }
512                                catch (Exception e) {
513                                    throw new RuntimeException(e);
514                                }
515                            }
516                    }
517                }
518                else {
519                    if (connEvent.isError()) {
520                        // TODO Check why jbosh's getCause returns Throwable here. This is very
521                        // unusual and should be avoided if possible
522                        Throwable cause = connEvent.getCause();
523                        Exception e;
524                        if (cause instanceof Exception) {
525                            e = (Exception) cause;
526                        } else {
527                            e = new Exception(cause);
528                        }
529                        notifyConnectionError(e);
530                    }
531                    connected = false;
532                }
533            }
534            finally {
535                notified = true;
536                synchronized (XMPPBOSHConnection.this) {
537                    XMPPBOSHConnection.this.notifyAll();
538                }
539            }
540        }
541    }
542
543    /**
544     * Listens for XML traffic from the BOSH connection manager and parses it into
545     * stanza objects.
546     *
547     * @author Guenther Niess
548     */
549    private class BOSHPacketReader implements BOSHClientResponseListener {
550
551        /**
552         * Parse the received packets and notify the corresponding connection.
553         *
554         * @param event the BOSH client response which includes the received packet.
555         */
556        @Override
557        public void responseReceived(BOSHMessageEvent event) {
558            AbstractBody body = event.getBody();
559            if (body != null) {
560                try {
561                    if (sessionID == null) {
562                        sessionID = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "sid"));
563                    }
564                    if (streamId == null) {
565                        streamId = body.getAttribute(BodyQName.create(XMPPBOSHConnection.BOSH_URI, "authid"));
566                    }
567                    final XmlPullParser parser = PacketParserUtils.getParserFor(body.toXML());
568
569                    XmlPullParser.Event eventType = parser.getEventType();
570                    do {
571                        eventType = parser.next();
572                        switch (eventType) {
573                        case START_ELEMENT:
574                            String name = parser.getName();
575                            switch (name) {
576                            case Message.ELEMENT:
577                            case IQ.IQ_ELEMENT:
578                            case Presence.ELEMENT:
579                                parseAndProcessStanza(parser);
580                                break;
581                            case "features":
582                                parseFeaturesAndNotify(parser);
583                                break;
584                            case "error":
585                                // Some BOSH error isn't stream error.
586                                if ("urn:ietf:params:xml:ns:xmpp-streams".equals(parser.getNamespace(null))) {
587                                    throw new StreamErrorException(PacketParserUtils.parseStreamError(parser));
588                                } else {
589                                    StanzaError stanzaError = PacketParserUtils.parseError(parser);
590                                    throw new XMPPException.XMPPErrorException(null, stanzaError);
591                                }
592                            default:
593                                parseAndProcessNonza(parser);
594                                break;
595                            }
596                            break;
597                        default:
598                            // Catch all for incomplete switch (MissingCasesInEnumSwitch) statement.
599                            break;
600                        }
601                    }
602                    while (eventType != XmlPullParser.Event.END_DOCUMENT);
603                }
604                catch (Exception e) {
605                    if (isConnected()) {
606                        notifyConnectionError(e);
607                    }
608                }
609            }
610        }
611    }
612}