View Javadoc
1   /*
2    * Copyright (c) 1999, 2013, Oracle and/or its affiliates. All rights reserved.
3    * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4    *
5    * This code is free software; you can redistribute it and/or modify it
6    * under the terms of the GNU General Public License version 2 only, as
7    * published by the Free Software Foundation.  Oracle designates this
8    * particular file as subject to the "Classpath" exception as provided
9    * by Oracle in the LICENSE file that accompanied this code.
10   *
11   * This code is distributed in the hope that it will be useful, but WITHOUT
12   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14   * version 2 for more details (a copy is included in the LICENSE file that
15   * accompanied this code).
16   *
17   * You should have received a copy of the GNU General Public License version
18   * 2 along with this work; if not, write to the Free Software Foundation,
19   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20   *
21   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22   * or visit www.oracle.com if you need additional information or have any
23   * questions.
24   */
25  package sun.net.www.http;
26  
27  import java.io.*;
28  import java.util.*;
29  
30  import sun.net.*;
31  import sun.net.www.*;
32  
33  /**
34   * A <code>ChunkedInputStream</code> provides a stream for reading a body of
35   * a http message that can be sent as a series of chunks, each with its own
36   * size indicator. Optionally the last chunk can be followed by trailers
37   * containing entity-header fields.
38   * <p>
39   * A <code>ChunkedInputStream</code> is also <code>Hurryable</code> so it
40   * can be hurried to the end of the stream if the bytes are available on
41   * the underlying stream.
42   */
43  public
44  class ChunkedInputStream extends InputStream implements Hurryable {
45  
46      /**
47       * The underlying stream
48       */
49      private InputStream in;
50  
51      /**
52       * The <code>HttpClient</code> that should be notified when the chunked stream has
53       * completed.
54       */
55      private HttpClient hc;
56  
57      /**
58       * The <code>MessageHeader</code> that is populated with any optional trailer
59       * that appear after the last chunk.
60       */
61      private MessageHeader responses;
62  
63      /**
64       * The size, in bytes, of the chunk that is currently being read.
65       * This size is only valid if the current position in the underlying
66       * input stream is inside a chunk (ie: state == STATE_READING_CHUNK).
67       */
68      private int chunkSize;
69  
70      /**
71       * The number of bytes read from the underlying stream for the current
72       * chunk. This value is always in the range <code>0</code> through to
73       * <code>chunkSize</code>
74       */
75      private int chunkRead;
76  
77      /**
78       * The internal buffer array where chunk data is available for the
79       * application to read.
80       */
81      private byte chunkData[] = new byte[4096];
82  
83      /**
84       * The current position in the buffer. It contains the index
85       * of the next byte to read from <code>chunkData</code>
86       */
87      private int chunkPos;
88  
89      /**
90       * The index one greater than the index of the last valid byte in the
91       * buffer. This value is always in the range <code>0</code> through
92       * <code>chunkData.length</code>.
93       */
94      private int chunkCount;
95  
96      /**
97       * The internal buffer where bytes from the underlying stream can be
98       * read. It may contain bytes representing chunk-size, chunk-data, or
99       * trailer fields.
100      */
101     private byte rawData[] = new byte[32];
102 
103     /**
104      * The current position in the buffer. It contains the index
105      * of the next byte to read from <code>rawData</code>
106      */
107     private int rawPos;
108 
109     /**
110      * The index one greater than the index of the last valid byte in the
111      * buffer. This value is always in the range <code>0</code> through
112      * <code>rawData.length</code>.
113      */
114     private int rawCount;
115 
116     /**
117      * Indicates if an error was encountered when processing the chunked
118      * stream.
119      */
120     private boolean error;
121 
122     /**
123      * Indicates if the chunked stream has been closed using the
124      * <code>close</code> method.
125      */
126     private boolean closed;
127 
128     /*
129      * Maximum chunk header size of 2KB + 2 bytes for CRLF
130      */
131     private final static int MAX_CHUNK_HEADER_SIZE = 2050;
132 
133     /**
134      * State to indicate that next field should be :-
135      *  chunk-size [ chunk-extension ] CRLF
136      */
137     static final int STATE_AWAITING_CHUNK_HEADER    = 1;
138 
139     /**
140      * State to indicate that we are currently reading the chunk-data.
141      */
142     static final int STATE_READING_CHUNK            = 2;
143 
144     /**
145      * Indicates that a chunk has been completely read and the next
146      * fields to be examine should be CRLF
147      */
148     static final int STATE_AWAITING_CHUNK_EOL       = 3;
149 
150     /**
151      * Indicates that all chunks have been read and the next field
152      * should be optional trailers or an indication that the chunked
153      * stream is complete.
154      */
155     static final int STATE_AWAITING_TRAILERS        = 4;
156 
157     /**
158      * State to indicate that the chunked stream is complete and
159      * no further bytes should be read from the underlying stream.
160      */
161     static final int STATE_DONE                     = 5;
162 
163     /**
164      * Indicates the current state.
165      */
166     private int state;
167 
168 
169     /**
170      * Check to make sure that this stream has not been closed.
171      */
172     private void ensureOpen() throws IOException {
173         if (closed) {
174             throw new IOException("stream is closed");
175         }
176     }
177 
178 
179     /**
180      * Ensures there is <code>size</code> bytes available in
181      * <code>rawData</code>. This requires that we either
182      * shift the bytes in use to the begining of the buffer
183      * or allocate a large buffer with sufficient space available.
184      */
185     private void ensureRawAvailable(int size) {
186         if (rawCount + size > rawData.length) {
187             int used = rawCount - rawPos;
188             if (used + size > rawData.length) {
189                 byte tmp[] = new byte[used + size];
190                 if (used > 0) {
191                     System.arraycopy(rawData, rawPos, tmp, 0, used);
192                 }
193                 rawData = tmp;
194             } else {
195                 if (used > 0) {
196                     System.arraycopy(rawData, rawPos, rawData, 0, used);
197                 }
198             }
199             rawCount = used;
200             rawPos = 0;
201         }
202     }
203 
204 
205     /**
206      * Close the underlying input stream by either returning it to the
207      * keep alive cache or closing the stream.
208      * <p>
209      * As a chunked stream is inheritly persistent (see HTTP 1.1 RFC) the
210      * underlying stream can be returned to the keep alive cache if the
211      * stream can be completely read without error.
212      */
213     private void closeUnderlying() throws IOException {
214         if (in == null) {
215             return;
216         }
217 
218         if (!error && state == STATE_DONE) {
219             hc.finished();
220         } else {
221             if (!hurry()) {
222                 hc.closeServer();
223             }
224         }
225 
226         in = null;
227     }
228 
229     /**
230      * Attempt to read the remainder of a chunk directly into the
231      * caller's buffer.
232      * <p>
233      * Return the number of bytes read.
234      */
235     private int fastRead(byte[] b, int off, int len) throws IOException {
236 
237         // assert state == STATE_READING_CHUNKS;
238 
239         int remaining = chunkSize - chunkRead;
240         int cnt = (remaining < len) ? remaining : len;
241         if (cnt > 0) {
242             int nread;
243             try {
244                 nread = in.read(b, off, cnt);
245             } catch (IOException e) {
246                 error = true;
247                 throw e;
248             }
249             if (nread > 0) {
250                 chunkRead += nread;
251                 if (chunkRead >= chunkSize) {
252                     state = STATE_AWAITING_CHUNK_EOL;
253                 }
254                 return nread;
255             }
256             error = true;
257             throw new IOException("Premature EOF");
258         } else {
259             return 0;
260         }
261     }
262 
263     /**
264      * Process any outstanding bytes that have already been read into
265      * <code>rawData</code>.
266      * <p>
267      * The parsing of the chunked stream is performed as a state machine with
268      * <code>state</code> representing the current state of the processing.
269      * <p>
270      * Returns when either all the outstanding bytes in rawData have been
271      * processed or there is insufficient bytes available to continue
272      * processing. When the latter occurs <code>rawPos</code> will not have
273      * been updated and thus the processing can be restarted once further
274      * bytes have been read into <code>rawData</code>.
275      */
276     private void processRaw() throws IOException {
277         int pos;
278         int i;
279 
280         while (state != STATE_DONE) {
281 
282             switch (state) {
283 
284                 /**
285                  * We are awaiting a line with a chunk header
286                  */
287                 case STATE_AWAITING_CHUNK_HEADER:
288                     /*
289                      * Find \n to indicate end of chunk header. If not found when there is
290                      * insufficient bytes in the raw buffer to parse a chunk header.
291                      */
292                     pos = rawPos;
293                     while (pos < rawCount) {
294                         if (rawData[pos] == '\n') {
295                             break;
296                         }
297                         pos++;
298                         if ((pos - rawPos) >= MAX_CHUNK_HEADER_SIZE) {
299                             error = true;
300                             throw new IOException("Chunk header too long");
301                         }
302                     }
303                     if (pos >= rawCount) {
304                         return;
305                     }
306 
307                     /*
308                      * Extract the chunk size from the header (ignoring extensions).
309                      */
310                     String header = new String(rawData, rawPos, pos-rawPos+1, "US-ASCII");
311                     for (i=0; i < header.length(); i++) {
312                         if (Character.digit(header.charAt(i), 16) == -1)
313                             break;
314                     }
315                     try {
316                         chunkSize = Integer.parseInt(header.substring(0, i), 16);
317                     } catch (NumberFormatException e) {
318                         error = true;
319                         throw new IOException("Bogus chunk size");
320                     }
321 
322                     /*
323                      * Chunk has been parsed so move rawPos to first byte of chunk
324                      * data.
325                      */
326                     rawPos = pos + 1;
327                     chunkRead = 0;
328 
329                     /*
330                      * A chunk size of 0 means EOF.
331                      */
332                     if (chunkSize > 0) {
333                         state = STATE_READING_CHUNK;
334                     } else {
335                         state = STATE_AWAITING_TRAILERS;
336                     }
337                     break;
338 
339 
340                 /**
341                  * We are awaiting raw entity data (some may have already been
342                  * read). chunkSize is the size of the chunk; chunkRead is the
343                  * total read from the underlying stream to date.
344                  */
345                 case STATE_READING_CHUNK :
346                     /* no data available yet */
347                     if (rawPos >= rawCount) {
348                         return;
349                     }
350 
351                     /*
352                      * Compute the number of bytes of chunk data available in the
353                      * raw buffer.
354                      */
355                     int copyLen = Math.min( chunkSize-chunkRead, rawCount-rawPos );
356 
357                     /*
358                      * Expand or compact chunkData if needed.
359                      */
360                     if (chunkData.length < chunkCount + copyLen) {
361                         int cnt = chunkCount - chunkPos;
362                         if (chunkData.length < cnt + copyLen) {
363                             byte tmp[] = new byte[cnt + copyLen];
364                             System.arraycopy(chunkData, chunkPos, tmp, 0, cnt);
365                             chunkData = tmp;
366                         } else {
367                             System.arraycopy(chunkData, chunkPos, chunkData, 0, cnt);
368                         }
369                         chunkPos = 0;
370                         chunkCount = cnt;
371                     }
372 
373                     /*
374                      * Copy the chunk data into chunkData so that it's available
375                      * to the read methods.
376                      */
377                     System.arraycopy(rawData, rawPos, chunkData, chunkCount, copyLen);
378                     rawPos += copyLen;
379                     chunkCount += copyLen;
380                     chunkRead += copyLen;
381 
382                     /*
383                      * If all the chunk has been copied into chunkData then the next
384                      * token should be CRLF.
385                      */
386                     if (chunkSize - chunkRead <= 0) {
387                         state = STATE_AWAITING_CHUNK_EOL;
388                     } else {
389                         return;
390                     }
391                     break;
392 
393 
394                 /**
395                  * Awaiting CRLF after the chunk
396                  */
397                 case STATE_AWAITING_CHUNK_EOL:
398                     /* not available yet */
399                     if (rawPos + 1 >= rawCount) {
400                         return;
401                     }
402 
403                     if (rawData[rawPos] != '\r') {
404                         error = true;
405                         throw new IOException("missing CR");
406                     }
407                     if (rawData[rawPos+1] != '\n') {
408                         error = true;
409                         throw new IOException("missing LF");
410                     }
411                     rawPos += 2;
412 
413                     /*
414                      * Move onto the next chunk
415                      */
416                     state = STATE_AWAITING_CHUNK_HEADER;
417                     break;
418 
419 
420                 /**
421                  * Last chunk has been read so not we're waiting for optional
422                  * trailers.
423                  */
424                 case STATE_AWAITING_TRAILERS:
425 
426                     /*
427                      * Do we have an entire line in the raw buffer?
428                      */
429                     pos = rawPos;
430                     while (pos < rawCount) {
431                         if (rawData[pos] == '\n') {
432                             break;
433                         }
434                         pos++;
435                     }
436                     if (pos >= rawCount) {
437                         return;
438                     }
439 
440                     if (pos == rawPos) {
441                         error = true;
442                         throw new IOException("LF should be proceeded by CR");
443                     }
444                     if (rawData[pos-1] != '\r') {
445                         error = true;
446                         throw new IOException("LF should be proceeded by CR");
447                     }
448 
449                     /*
450                      * Stream done so close underlying stream.
451                      */
452                     if (pos == (rawPos + 1)) {
453 
454                         state = STATE_DONE;
455                         closeUnderlying();
456 
457                         return;
458                     }
459 
460                     /*
461                      * Extract any tailers and append them to the message
462                      * headers.
463                      */
464                     String trailer = new String(rawData, rawPos, pos-rawPos, "US-ASCII");
465                     i = trailer.indexOf(':');
466                     if (i == -1) {
467                         throw new IOException("Malformed tailer - format should be key:value");
468                     }
469                     String key = (trailer.substring(0, i)).trim();
470                     String value = (trailer.substring(i+1, trailer.length())).trim();
471 
472                     responses.add(key, value);
473 
474                     /*
475                      * Move onto the next trailer.
476                      */
477                     rawPos = pos+1;
478                     break;
479 
480             } /* switch */
481         }
482     }
483 
484 
485     /**
486      * Reads any available bytes from the underlying stream into
487      * <code>rawData</code> and returns the number of bytes of
488      * chunk data available in <code>chunkData</code> that the
489      * application can read.
490      */
491     private int readAheadNonBlocking() throws IOException {
492 
493         /*
494          * If there's anything available on the underlying stream then we read
495          * it into the raw buffer and process it. Processing ensures that any
496          * available chunk data is made available in chunkData.
497          */
498         int avail = in.available();
499         if (avail > 0) {
500 
501             /* ensure that there is space in rawData to read the available */
502             ensureRawAvailable(avail);
503 
504             int nread;
505             try {
506                 nread = in.read(rawData, rawCount, avail);
507             } catch (IOException e) {
508                 error = true;
509                 throw e;
510             }
511             if (nread < 0) {
512                 error = true;   /* premature EOF ? */
513                 return -1;
514             }
515             rawCount += nread;
516 
517             /*
518              * Process the raw bytes that have been read.
519              */
520             processRaw();
521         }
522 
523         /*
524          * Return the number of chunked bytes available to read
525          */
526         return chunkCount - chunkPos;
527     }
528 
529     /**
530      * Reads from the underlying stream until there is chunk data
531      * available in <code>chunkData</code> for the application to
532      * read.
533      */
534     private int readAheadBlocking() throws IOException {
535 
536         do {
537             /*
538              * All of chunked response has been read to return EOF.
539              */
540             if (state == STATE_DONE) {
541                 return -1;
542             }
543 
544             /*
545              * We must read into the raw buffer so make sure there is space
546              * available. We use a size of 32 to avoid too much chunk data
547              * being read into the raw buffer.
548              */
549             ensureRawAvailable(32);
550             int nread;
551             try {
552                 nread = in.read(rawData, rawCount, rawData.length-rawCount);
553             } catch (IOException e) {
554                 error = true;
555                 throw e;
556             }
557 
558             /**
559              * If we hit EOF it means there's a problem as we should never
560              * attempt to read once the last chunk and trailers have been
561              * received.
562              */
563             if (nread < 0) {
564                 error = true;
565                 throw new IOException("Premature EOF");
566             }
567 
568             /**
569              * Process the bytes from the underlying stream
570              */
571             rawCount += nread;
572             processRaw();
573 
574         } while (chunkCount <= 0);
575 
576         /*
577          * Return the number of chunked bytes available to read
578          */
579         return chunkCount - chunkPos;
580     }
581 
582     /**
583      * Read ahead in either blocking or non-blocking mode. This method
584      * is typically used when we run out of available bytes in
585      * <code>chunkData</code> or we need to determine how many bytes
586      * are available on the input stream.
587      */
588     private int readAhead(boolean allowBlocking) throws IOException {
589 
590         /*
591          * Last chunk already received - return EOF
592          */
593         if (state == STATE_DONE) {
594             return -1;
595         }
596 
597         /*
598          * Reset position/count if data in chunkData is exhausted.
599          */
600         if (chunkPos >= chunkCount) {
601             chunkCount = 0;
602             chunkPos = 0;
603         }
604 
605         /*
606          * Read ahead blocking or non-blocking
607          */
608         if (allowBlocking) {
609             return readAheadBlocking();
610         } else {
611             return readAheadNonBlocking();
612         }
613     }
614 
615     /**
616      * Creates a <code>ChunkedInputStream</code> and saves its  arguments, for
617      * later use.
618      *
619      * @param   in   the underlying input stream.
620      * @param   hc   the HttpClient
621      * @param   responses   the MessageHeader that should be populated with optional
622      *                      trailers.
623      */
624     public ChunkedInputStream(InputStream in, HttpClient hc, MessageHeader responses) throws IOException {
625 
626         /* save arguments */
627         this.in = in;
628         this.responses = responses;
629         this.hc = hc;
630 
631         /*
632          * Set our initial state to indicate that we are first starting to
633          * look for a chunk header.
634          */
635         state = STATE_AWAITING_CHUNK_HEADER;
636     }
637 
638     /**
639      * See
640      * the general contract of the <code>read</code>
641      * method of <code>InputStream</code>.
642      *
643      * @return     the next byte of data, or <code>-1</code> if the end of the
644      *             stream is reached.
645      * @exception  IOException  if an I/O error occurs.
646      * @see        java.io.FilterInputStream#in
647      */
648     public synchronized int read() throws IOException {
649         ensureOpen();
650         if (chunkPos >= chunkCount) {
651             if (readAhead(true) <= 0) {
652                 return -1;
653             }
654         }
655         return chunkData[chunkPos++] & 0xff;
656     }
657 
658 
659     /**
660      * Reads bytes from this stream into the specified byte array, starting at
661      * the given offset.
662      *
663      * @param      b     destination buffer.
664      * @param      off   offset at which to start storing bytes.
665      * @param      len   maximum number of bytes to read.
666      * @return     the number of bytes read, or <code>-1</code> if the end of
667      *             the stream has been reached.
668      * @exception  IOException  if an I/O error occurs.
669      */
670     public synchronized int read(byte b[], int off, int len)
671         throws IOException
672     {
673         ensureOpen();
674         if ((off < 0) || (off > b.length) || (len < 0) ||
675             ((off + len) > b.length) || ((off + len) < 0)) {
676             throw new IndexOutOfBoundsException();
677         } else if (len == 0) {
678             return 0;
679         }
680 
681         int avail = chunkCount - chunkPos;
682         if (avail <= 0) {
683             /*
684              * Optimization: if we're in the middle of the chunk read
685              * directly from the underlying stream into the caller's
686              * buffer
687              */
688             if (state == STATE_READING_CHUNK) {
689                 return fastRead( b, off, len );
690             }
691 
692             /*
693              * We're not in the middle of a chunk so we must read ahead
694              * until there is some chunk data available.
695              */
696             avail = readAhead(true);
697             if (avail < 0) {
698                 return -1;      /* EOF */
699             }
700         }
701         int cnt = (avail < len) ? avail : len;
702         System.arraycopy(chunkData, chunkPos, b, off, cnt);
703         chunkPos += cnt;
704 
705         return cnt;
706     }
707 
708     /**
709      * Returns the number of bytes that can be read from this input
710      * stream without blocking.
711      *
712      * @return     the number of bytes that can be read from this input
713      *             stream without blocking.
714      * @exception  IOException  if an I/O error occurs.
715      * @see        java.io.FilterInputStream#in
716      */
717     public synchronized int available() throws IOException {
718         ensureOpen();
719 
720         int avail = chunkCount - chunkPos;
721         if(avail > 0) {
722             return avail;
723         }
724 
725         avail = readAhead(false);
726 
727         if (avail < 0) {
728             return 0;
729         } else  {
730             return avail;
731         }
732     }
733 
734     /**
735      * Close the stream by either returning the connection to the
736      * keep alive cache or closing the underlying stream.
737      * <p>
738      * If the chunked response hasn't been completely read we
739      * try to "hurry" to the end of the response. If this is
740      * possible (without blocking) then the connection can be
741      * returned to the keep alive cache.
742      *
743      * @exception  IOException  if an I/O error occurs.
744      */
745     public synchronized void close() throws IOException {
746         if (closed) {
747             return;
748         }
749         closeUnderlying();
750         closed = true;
751     }
752 
753     /**
754      * Hurry the input stream by reading everything from the underlying
755      * stream. If the last chunk (and optional trailers) can be read without
756      * blocking then the stream is considered hurried.
757      * <p>
758      * Note that if an error has occurred or we can't get to last chunk
759      * without blocking then this stream can't be hurried and should be
760      * closed.
761      */
762     public synchronized boolean hurry() {
763         if (in == null || error) {
764             return false;
765         }
766 
767         try {
768             readAhead(false);
769         } catch (Exception e) {
770             return false;
771         }
772 
773         if (error) {
774             return false;
775         }
776 
777         return (state == STATE_DONE);
778     }
779 
780 }