001/**
002 *
003 * Copyright the original author or authors
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 *     http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.jivesoftware.smackx.bytestreams.ibb;
018
019import java.io.IOException;
020import java.io.InputStream;
021import java.io.OutputStream;
022import java.net.SocketTimeoutException;
023import java.util.concurrent.BlockingQueue;
024import java.util.concurrent.LinkedBlockingQueue;
025import java.util.concurrent.TimeUnit;
026
027import org.jivesoftware.smack.SmackException.NotConnectedException;
028import org.jivesoftware.smack.XMPPConnection;
029import org.jivesoftware.smack.StanzaListener;
030import org.jivesoftware.smack.filter.AndFilter;
031import org.jivesoftware.smack.filter.StanzaFilter;
032import org.jivesoftware.smack.filter.StanzaTypeFilter;
033import org.jivesoftware.smack.packet.IQ;
034import org.jivesoftware.smack.packet.Message;
035import org.jivesoftware.smack.packet.Stanza;
036import org.jivesoftware.smack.packet.XMPPError;
037import org.jivesoftware.smack.util.stringencoder.Base64;
038import org.jivesoftware.smackx.bytestreams.BytestreamSession;
039import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
040import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
041import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
042import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
043import org.jxmpp.jid.Jid;
044
045/**
046 * InBandBytestreamSession class represents an In-Band Bytestream session.
047 * <p>
048 * In-band bytestreams are bidirectional and this session encapsulates the streams for both
049 * directions.
050 * <p>
051 * Note that closing the In-Band Bytestream session will close both streams. If both streams are
052 * closed individually the session will be closed automatically once the second stream is closed.
053 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
054 * automatically if one of them is closed.
055 * 
056 * @author Henning Staib
057 */
058public class InBandBytestreamSession implements BytestreamSession {
059
060    /* XMPP connection */
061    private final XMPPConnection connection;
062
063    /* the In-Band Bytestream open request for this session */
064    private final Open byteStreamRequest;
065
066    /*
067     * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
068     */
069    private IBBInputStream inputStream;
070
071    /*
072     * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
073     */
074    private IBBOutputStream outputStream;
075
076    /* JID of the remote peer */
077    private Jid remoteJID;
078
079    /* flag to close both streams if one of them is closed */
080    private boolean closeBothStreamsEnabled = false;
081
082    /* flag to indicate if session is closed */
083    private boolean isClosed = false;
084
085    /**
086     * Constructor.
087     * 
088     * @param connection the XMPP connection
089     * @param byteStreamRequest the In-Band Bytestream open request for this session
090     * @param remoteJID JID of the remote peer
091     */
092    protected InBandBytestreamSession(XMPPConnection connection, Open byteStreamRequest,
093                    Jid remoteJID) {
094        this.connection = connection;
095        this.byteStreamRequest = byteStreamRequest;
096        this.remoteJID = remoteJID;
097
098        // initialize streams dependent to the uses stanza type
099        switch (byteStreamRequest.getStanza()) {
100        case IQ:
101            this.inputStream = new IQIBBInputStream();
102            this.outputStream = new IQIBBOutputStream();
103            break;
104        case MESSAGE:
105            this.inputStream = new MessageIBBInputStream();
106            this.outputStream = new MessageIBBOutputStream();
107            break;
108        }
109
110    }
111
112    @Override
113    public InputStream getInputStream() {
114        return this.inputStream;
115    }
116
117    @Override
118    public OutputStream getOutputStream() {
119        return this.outputStream;
120    }
121
122    @Override
123    public int getReadTimeout() {
124        return this.inputStream.readTimeout;
125    }
126
127    @Override
128    public void setReadTimeout(int timeout) {
129        if (timeout < 0) {
130            throw new IllegalArgumentException("Timeout must be >= 0");
131        }
132        this.inputStream.readTimeout = timeout;
133    }
134
135    /**
136     * Returns whether both streams should be closed automatically if one of the streams is closed.
137     * Default is <code>false</code>.
138     * 
139     * @return <code>true</code> if both streams will be closed if one of the streams is closed,
140     *         <code>false</code> if both streams can be closed independently.
141     */
142    public boolean isCloseBothStreamsEnabled() {
143        return closeBothStreamsEnabled;
144    }
145
146    /**
147     * Sets whether both streams should be closed automatically if one of the streams is closed.
148     * Default is <code>false</code>.
149     * 
150     * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
151     *        the streams is closed, <code>false</code> if both streams should be closed
152     *        independently
153     */
154    public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
155        this.closeBothStreamsEnabled = closeBothStreamsEnabled;
156    }
157
158    @Override
159    public void close() throws IOException {
160        closeByLocal(true); // close input stream
161        closeByLocal(false); // close output stream
162    }
163
164    /**
165     * This method is invoked if a request to close the In-Band Bytestream has been received.
166     * 
167     * @param closeRequest the close request from the remote peer
168     * @throws NotConnectedException 
169     * @throws InterruptedException 
170     */
171    protected void closeByPeer(Close closeRequest) throws NotConnectedException, InterruptedException {
172
173        /*
174         * close streams without flushing them, because stream is already considered closed on the
175         * remote peers side
176         */
177        this.inputStream.closeInternal();
178        this.inputStream.cleanup();
179        this.outputStream.closeInternal(false);
180
181        // acknowledge close request
182        IQ confirmClose = IQ.createResultIQ(closeRequest);
183        this.connection.sendStanza(confirmClose);
184
185    }
186
187    /**
188     * This method is invoked if one of the streams has been closed locally, if an error occurred
189     * locally or if the whole session should be closed.
190     * 
191     * @throws IOException if an error occurs while sending the close request
192     */
193    protected synchronized void closeByLocal(boolean in) throws IOException {
194        if (this.isClosed) {
195            return;
196        }
197
198        if (this.closeBothStreamsEnabled) {
199            this.inputStream.closeInternal();
200            this.outputStream.closeInternal(true);
201        }
202        else {
203            if (in) {
204                this.inputStream.closeInternal();
205            }
206            else {
207                // close stream but try to send any data left
208                this.outputStream.closeInternal(true);
209            }
210        }
211
212        if (this.inputStream.isClosed && this.outputStream.isClosed) {
213            this.isClosed = true;
214
215            // send close request
216            Close close = new Close(this.byteStreamRequest.getSessionID());
217            close.setTo(this.remoteJID);
218            try {
219                connection.createStanzaCollectorAndSend(close).nextResultOrThrow();
220            }
221            catch (Exception e) {
222                // Sadly we are unable to use the IOException(Throwable) constructor because this
223                // constructor is only supported from Android API 9 on.
224                IOException ioException = new IOException();
225                ioException.initCause(e);
226                throw ioException;
227            }
228
229            this.inputStream.cleanup();
230
231            // remove session from manager
232            // Thanks Google Error Prone for finding the bug where remove() was called with 'this' as argument. Changed
233            // now to remove(byteStreamRequest.getSessionID).
234            InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(byteStreamRequest.getSessionID());
235        }
236
237    }
238
239    /**
240     * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
241     * Subclasses of this input stream must provide a stanza(/packet) listener along with a stanza(/packet) filter to
242     * collect the In-Band Bytestream data packets.
243     */
244    private abstract class IBBInputStream extends InputStream {
245
246        /* the data packet listener to fill the data queue */
247        private final StanzaListener dataPacketListener;
248
249        /* queue containing received In-Band Bytestream data packets */
250        protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
251
252        /* buffer containing the data from one data packet */
253        private byte[] buffer;
254
255        /* pointer to the next byte to read from buffer */
256        private int bufferPointer = -1;
257
258        /* data packet sequence (range from 0 to 65535) */
259        private long seq = -1;
260
261        /* flag to indicate if input stream is closed */
262        private boolean isClosed = false;
263
264        /* flag to indicate if close method was invoked */
265        private boolean closeInvoked = false;
266
267        /* timeout for read operations */
268        private int readTimeout = 0;
269
270        /**
271         * Constructor.
272         */
273        public IBBInputStream() {
274            // add data packet listener to connection
275            this.dataPacketListener = getDataPacketListener();
276            connection.addSyncStanzaListener(this.dataPacketListener, getDataPacketFilter());
277        }
278
279        /**
280         * Returns the stanza(/packet) listener that processes In-Band Bytestream data packets.
281         * 
282         * @return the data stanza(/packet) listener
283         */
284        protected abstract StanzaListener getDataPacketListener();
285
286        /**
287         * Returns the stanza(/packet) filter that accepts In-Band Bytestream data packets.
288         * 
289         * @return the data stanza(/packet) filter
290         */
291        protected abstract StanzaFilter getDataPacketFilter();
292
293        @Override
294        public synchronized int read() throws IOException {
295            checkClosed();
296
297            // if nothing read yet or whole buffer has been read fill buffer
298            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
299                // if no data available and stream was closed return -1
300                if (!loadBuffer()) {
301                    return -1;
302                }
303            }
304
305            // return byte and increment buffer pointer
306            return buffer[bufferPointer++] & 0xff;
307        }
308
309        @Override
310        public synchronized int read(byte[] b, int off, int len) throws IOException {
311            if (b == null) {
312                throw new NullPointerException();
313            }
314            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
315                            || ((off + len) < 0)) {
316                throw new IndexOutOfBoundsException();
317            }
318            else if (len == 0) {
319                return 0;
320            }
321
322            checkClosed();
323
324            // if nothing read yet or whole buffer has been read fill buffer
325            if (bufferPointer == -1 || bufferPointer >= buffer.length) {
326                // if no data available and stream was closed return -1
327                if (!loadBuffer()) {
328                    return -1;
329                }
330            }
331
332            // if more bytes wanted than available return all available
333            int bytesAvailable = buffer.length - bufferPointer;
334            if (len > bytesAvailable) {
335                len = bytesAvailable;
336            }
337
338            System.arraycopy(buffer, bufferPointer, b, off, len);
339            bufferPointer += len;
340            return len;
341        }
342
343        @Override
344        public synchronized int read(byte[] b) throws IOException {
345            return read(b, 0, b.length);
346        }
347
348        /**
349         * This method blocks until a data stanza(/packet) is received, the stream is closed or the current
350         * thread is interrupted.
351         * 
352         * @return <code>true</code> if data was received, otherwise <code>false</code>
353         * @throws IOException if data packets are out of sequence
354         */
355        private synchronized boolean loadBuffer() throws IOException {
356
357            // wait until data is available or stream is closed
358            DataPacketExtension data = null;
359            try {
360                if (this.readTimeout == 0) {
361                    while (data == null) {
362                        if (isClosed && this.dataQueue.isEmpty()) {
363                            return false;
364                        }
365                        data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
366                    }
367                }
368                else {
369                    data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
370                    if (data == null) {
371                        throw new SocketTimeoutException();
372                    }
373                }
374            }
375            catch (InterruptedException e) {
376                // Restore the interrupted status
377                Thread.currentThread().interrupt();
378                return false;
379            }
380
381            // handle sequence overflow
382            if (this.seq == 65535) {
383                this.seq = -1;
384            }
385
386            // check if data packets sequence is successor of last seen sequence
387            long seq = data.getSeq();
388            if (seq - 1 != this.seq) {
389                // packets out of order; close stream/session
390                InBandBytestreamSession.this.close();
391                throw new IOException("Packets out of sequence");
392            }
393            else {
394                this.seq = seq;
395            }
396
397            // set buffer to decoded data
398            buffer = data.getDecodedData();
399            bufferPointer = 0;
400            return true;
401        }
402
403        /**
404         * Checks if this stream is closed and throws an IOException if necessary
405         * 
406         * @throws IOException if stream is closed and no data should be read anymore
407         */
408        private void checkClosed() throws IOException {
409            // Throw an exception if, and only if, this stream has been already
410            // closed by the user using the close() method
411            if (closeInvoked) {
412                // clear data queue in case additional data was received after stream was closed
413                this.dataQueue.clear();
414                throw new IOException("Stream is closed");
415            }
416        }
417
418        @Override
419        public boolean markSupported() {
420            return false;
421        }
422
423        @Override
424        public void close() throws IOException {
425            if (closeInvoked) {
426                return;
427            }
428
429            this.closeInvoked = true;
430
431            InBandBytestreamSession.this.closeByLocal(true);
432        }
433
434        /**
435         * This method sets the close flag and removes the data stanza(/packet) listener.
436         */
437        private void closeInternal() {
438            if (isClosed) {
439                return;
440            }
441            isClosed = true;
442        }
443
444        /**
445         * Invoked if the session is closed.
446         */
447        private void cleanup() {
448            connection.removeSyncStanzaListener(this.dataPacketListener);
449        }
450
451    }
452
453    /**
454     * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
455     * data packets.
456     */
457    private class IQIBBInputStream extends IBBInputStream {
458
459        @Override
460        protected StanzaListener getDataPacketListener() {
461            return new StanzaListener() {
462
463                private long lastSequence = -1;
464
465                @Override
466                public void processStanza(Stanza packet) throws NotConnectedException, InterruptedException {
467                    // get data packet extension
468                    DataPacketExtension data = ((Data) packet).getDataPacketExtension();
469
470                    /*
471                     * check if sequence was not used already (see XEP-0047 Section 2.2)
472                     */
473                    if (data.getSeq() <= this.lastSequence) {
474                        IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet,
475                                        XMPPError.Condition.unexpected_request);
476                        connection.sendStanza(unexpectedRequest);
477                        return;
478
479                    }
480
481                    // check if encoded data is valid (see XEP-0047 Section 2.2)
482                    if (data.getDecodedData() == null) {
483                        // data is invalid; respond with bad-request error
484                        IQ badRequest = IQ.createErrorResponse((IQ) packet,
485                                        XMPPError.Condition.bad_request);
486                        connection.sendStanza(badRequest);
487                        return;
488                    }
489
490                    // data is valid; add to data queue
491                    dataQueue.offer(data);
492
493                    // confirm IQ
494                    IQ confirmData = IQ.createResultIQ((IQ) packet);
495                    connection.sendStanza(confirmData);
496
497                    // set last seen sequence
498                    this.lastSequence = data.getSeq();
499                    if (this.lastSequence == 65535) {
500                        this.lastSequence = -1;
501                    }
502
503                }
504
505            };
506        }
507
508        @Override
509        protected StanzaFilter getDataPacketFilter() {
510            /*
511             * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
512             * data stanza(/packet) extension, matching session ID and recipient
513             */
514            return new AndFilter(new StanzaTypeFilter(Data.class), new IBBDataPacketFilter());
515        }
516
517    }
518
519    /**
520     * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
521     * encapsulating the data packets.
522     */
523    private class MessageIBBInputStream extends IBBInputStream {
524
525        @Override
526        protected StanzaListener getDataPacketListener() {
527            return new StanzaListener() {
528
529                @Override
530                public void processStanza(Stanza packet) {
531                    // get data packet extension
532                    DataPacketExtension data = (DataPacketExtension) packet.getExtension(
533                                    DataPacketExtension.ELEMENT,
534                                    DataPacketExtension.NAMESPACE);
535
536                    // check if encoded data is valid
537                    if (data.getDecodedData() == null) {
538                        /*
539                         * TODO once a majority of XMPP server implementation support XEP-0079
540                         * Advanced Message Processing the invalid message could be answered with an
541                         * appropriate error. For now we just ignore the packet. Subsequent packets
542                         * with an increased sequence will cause the input stream to close the
543                         * stream/session.
544                         */
545                        return;
546                    }
547
548                    // data is valid; add to data queue
549                    dataQueue.offer(data);
550
551                    // TODO confirm packet once XMPP servers support XEP-0079
552                }
553
554            };
555        }
556
557        @Override
558        protected StanzaFilter getDataPacketFilter() {
559            /*
560             * filter all message stanzas containing a data stanza(/packet) extension, matching session ID
561             * and recipient
562             */
563            return new AndFilter(new StanzaTypeFilter(Message.class), new IBBDataPacketFilter());
564        }
565
566    }
567
568    /**
569     * IBBDataPacketFilter class filters all packets from the remote peer of this session,
570     * containing an In-Band Bytestream data stanza(/packet) extension whose session ID matches this sessions
571     * ID.
572     */
573    private class IBBDataPacketFilter implements StanzaFilter {
574
575        @Override
576        public boolean accept(Stanza packet) {
577            // sender equals remote peer
578            if (!packet.getFrom().equals(remoteJID)) {
579                return false;
580            }
581
582            DataPacketExtension data;
583            if (packet instanceof Data) {
584                data = ((Data) packet).getDataPacketExtension();
585            } else {
586                // stanza contains data packet extension
587                data = packet.getExtension(
588                        DataPacketExtension.ELEMENT,
589                        DataPacketExtension.NAMESPACE);
590                if (data == null) {
591                    return false;
592                }
593            }
594
595            // session ID equals this session ID
596            if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
597                return false;
598            }
599
600            return true;
601        }
602
603    }
604
605    /**
606     * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
607     * Subclasses of this output stream must provide a method to send data over XMPP stream.
608     */
609    private abstract class IBBOutputStream extends OutputStream {
610
611        /* buffer with the size of this sessions block size */
612        protected final byte[] buffer;
613
614        /* pointer to next byte to write to buffer */
615        protected int bufferPointer = 0;
616
617        /* data packet sequence (range from 0 to 65535) */
618        protected long seq = 0;
619
620        /* flag to indicate if output stream is closed */
621        protected boolean isClosed = false;
622
623        /**
624         * Constructor.
625         */
626        public IBBOutputStream() {
627            this.buffer = new byte[byteStreamRequest.getBlockSize()];
628        }
629
630        /**
631         * Writes the given data stanza(/packet) to the XMPP stream.
632         * 
633         * @param data the data packet
634         * @throws IOException if an I/O error occurred while sending or if the stream is closed
635         * @throws NotConnectedException 
636         * @throws InterruptedException 
637         */
638        protected abstract void writeToXML(DataPacketExtension data) throws IOException, NotConnectedException, InterruptedException;
639
640        @Override
641        public synchronized void write(int b) throws IOException {
642            if (this.isClosed) {
643                throw new IOException("Stream is closed");
644            }
645
646            // if buffer is full flush buffer
647            if (bufferPointer >= buffer.length) {
648                flushBuffer();
649            }
650
651            buffer[bufferPointer++] = (byte) b;
652        }
653
654        @Override
655        public synchronized void write(byte[] b, int off, int len) throws IOException {
656            if (b == null) {
657                throw new NullPointerException();
658            }
659            else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
660                            || ((off + len) < 0)) {
661                throw new IndexOutOfBoundsException();
662            }
663            else if (len == 0) {
664                return;
665            }
666
667            if (this.isClosed) {
668                throw new IOException("Stream is closed");
669            }
670
671            // is data to send greater than buffer size
672            if (len >= buffer.length) {
673
674                // "byte" off the first chunk to write out
675                writeOut(b, off, buffer.length);
676
677                // recursively call this method with the lesser amount
678                write(b, off + buffer.length, len - buffer.length);
679            }
680            else {
681                writeOut(b, off, len);
682            }
683        }
684
685        @Override
686        public synchronized void write(byte[] b) throws IOException {
687            write(b, 0, b.length);
688        }
689
690        /**
691         * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
692         * capacity has been reached. This method is only called from this class so it is assured
693         * that the amount of data to send is <= buffer capacity
694         * 
695         * @param b the data
696         * @param off the data
697         * @param len the number of bytes to write
698         * @throws IOException if an I/O error occurred while sending or if the stream is closed
699         */
700        private synchronized void writeOut(byte[] b, int off, int len) throws IOException {
701            if (this.isClosed) {
702                throw new IOException("Stream is closed");
703            }
704
705            // set to 0 in case the next 'if' block is not executed
706            int available = 0;
707
708            // is data to send greater that buffer space left
709            if (len > buffer.length - bufferPointer) {
710                // fill buffer to capacity and send it
711                available = buffer.length - bufferPointer;
712                System.arraycopy(b, off, buffer, bufferPointer, available);
713                bufferPointer += available;
714                flushBuffer();
715            }
716
717            // copy the data left to buffer
718            System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
719            bufferPointer += len - available;
720        }
721
722        @Override
723        public synchronized void flush() throws IOException {
724            if (this.isClosed) {
725                throw new IOException("Stream is closed");
726            }
727            flushBuffer();
728        }
729
730        private synchronized void flushBuffer() throws IOException {
731
732            // do nothing if no data to send available
733            if (bufferPointer == 0) {
734                return;
735            }
736
737            // create data packet
738            String enc = Base64.encodeToString(buffer, 0, bufferPointer);
739            DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
740                            this.seq, enc);
741
742            // write to XMPP stream
743            try {
744                writeToXML(data);
745            }
746            catch (InterruptedException | NotConnectedException e) {
747                IOException ioException = new IOException();
748                ioException.initCause(e);
749                throw ioException;
750            }
751
752            // reset buffer pointer
753            bufferPointer = 0;
754
755            // increment sequence, considering sequence overflow
756            this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
757
758        }
759
760        @Override
761        public void close() throws IOException {
762            if (isClosed) {
763                return;
764            }
765            InBandBytestreamSession.this.closeByLocal(false);
766        }
767
768        /**
769         * Sets the close flag and optionally flushes the stream.
770         * 
771         * @param flush if <code>true</code> flushes the stream
772         */
773        protected void closeInternal(boolean flush) {
774            if (this.isClosed) {
775                return;
776            }
777            this.isClosed = true;
778
779            try {
780                if (flush) {
781                    flushBuffer();
782                }
783            }
784            catch (IOException e) {
785                /*
786                 * ignore, because writeToXML() will not throw an exception if stream is already
787                 * closed
788                 */
789            }
790        }
791
792    }
793
794    /**
795     * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
796     * the data packets.
797     */
798    private class IQIBBOutputStream extends IBBOutputStream {
799
800        @Override
801        protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
802            // create IQ stanza containing data packet
803            IQ iq = new Data(data);
804            iq.setTo(remoteJID);
805
806            try {
807                connection.createStanzaCollectorAndSend(iq).nextResultOrThrow();
808            }
809            catch (Exception e) {
810                // close session unless it is already closed
811                if (!this.isClosed) {
812                    InBandBytestreamSession.this.close();
813                    // Sadly we are unable to use the IOException(Throwable) constructor because this
814                    // constructor is only supported from Android API 9 on.
815                    IOException ioException = new IOException();
816                    ioException.initCause(e);
817                    throw ioException;
818                }
819            }
820
821        }
822
823    }
824
825    /**
826     * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
827     * encapsulating the data packets.
828     */
829    private class MessageIBBOutputStream extends IBBOutputStream {
830
831        @Override
832        protected synchronized void writeToXML(DataPacketExtension data) throws NotConnectedException, InterruptedException {
833            // create message stanza containing data packet
834            Message message = new Message(remoteJID);
835            message.addExtension(data);
836
837            connection.sendStanza(message);
838
839        }
840
841    }
842
843    /**
844     * Process IQ stanza.
845     * @param data
846     * @throws NotConnectedException
847     * @throws InterruptedException 
848     */
849    public void processIQPacket(Data data) throws NotConnectedException, InterruptedException {
850        inputStream.dataPacketListener.processStanza(data);
851    }
852
853}