tungstenite/protocol/
mod.rs

1//! Generic WebSocket message stream.
2
3pub mod frame;
4
5mod message;
6
7pub use self::{frame::CloseFrame, message::Message};
8
9use self::{
10    frame::{
11        coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode},
12        Frame, FrameCodec,
13    },
14    message::{IncompleteMessage, IncompleteMessageType},
15};
16use crate::error::{Error, ProtocolError, Result};
17use log::*;
18use std::{
19    io::{self, Read, Write},
20    mem::replace,
21};
22
23/// Indicates a Client or Server role of the websocket
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum Role {
26    /// This socket is a server
27    Server,
28    /// This socket is a client
29    Client,
30}
31
32/// The configuration for WebSocket connection.
33#[derive(Debug, Clone, Copy)]
34pub struct WebSocketConfig {
35    /// Does nothing, instead use `max_write_buffer_size`.
36    #[deprecated]
37    pub max_send_queue: Option<usize>,
38    /// The target minimum size of the write buffer to reach before writing the data
39    /// to the underlying stream.
40    /// The default value is 128 KiB.
41    ///
42    /// If set to `0` each message will be eagerly written to the underlying stream.
43    /// It is often more optimal to allow them to buffer a little, hence the default value.
44    ///
45    /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
46    pub write_buffer_size: usize,
47    /// The max size of the write buffer in bytes. Setting this can provide backpressure
48    /// in the case the write buffer is filling up due to write errors.
49    /// The default value is unlimited.
50    ///
51    /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size)
52    /// when writes to the underlying stream are failing. So the **write buffer can not
53    /// fill up if you are not observing write errors even if not flushing**.
54    ///
55    /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size)
56    /// and probably a little more depending on error handling strategy.
57    pub max_write_buffer_size: usize,
58    /// The maximum size of an incoming message. `None` means no size limit. The default value is 64 MiB
59    /// which should be reasonably big for all normal use-cases but small enough to prevent
60    /// memory eating by a malicious user.
61    pub max_message_size: Option<usize>,
62    /// The maximum size of a single incoming message frame. `None` means no size limit. The limit is for
63    /// frame payload NOT including the frame header. The default value is 16 MiB which should
64    /// be reasonably big for all normal use-cases but small enough to prevent memory eating
65    /// by a malicious user.
66    pub max_frame_size: Option<usize>,
67    /// When set to `true`, the server will accept and handle unmasked frames
68    /// from the client. According to the RFC 6455, the server must close the
69    /// connection to the client in such cases, however it seems like there are
70    /// some popular libraries that are sending unmasked frames, ignoring the RFC.
71    /// By default this option is set to `false`, i.e. according to RFC 6455.
72    pub accept_unmasked_frames: bool,
73}
74
75impl Default for WebSocketConfig {
76    fn default() -> Self {
77        #[allow(deprecated)]
78        WebSocketConfig {
79            max_send_queue: None,
80            write_buffer_size: 128 * 1024,
81            max_write_buffer_size: usize::MAX,
82            max_message_size: Some(64 << 20),
83            max_frame_size: Some(16 << 20),
84            accept_unmasked_frames: false,
85        }
86    }
87}
88
89impl WebSocketConfig {
90    /// Panic if values are invalid.
91    pub(crate) fn assert_valid(&self) {
92        assert!(
93            self.max_write_buffer_size > self.write_buffer_size,
94            "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \
95            see WebSocketConfig docs`"
96        );
97    }
98}
99
100/// WebSocket input-output stream.
101///
102/// This is THE structure you want to create to be able to speak the WebSocket protocol.
103/// It may be created by calling `connect`, `accept` or `client` functions.
104///
105/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
106#[derive(Debug)]
107pub struct WebSocket<Stream> {
108    /// The underlying socket.
109    socket: Stream,
110    /// The context for managing a WebSocket.
111    context: WebSocketContext,
112}
113
114impl<Stream> WebSocket<Stream> {
115    /// Convert a raw socket into a WebSocket without performing a handshake.
116    ///
117    /// Call this function if you're using Tungstenite as a part of a web framework
118    /// or together with an existing one. If you need an initial handshake, use
119    /// `connect()` or `accept()` functions of the crate to construct a websocket.
120    ///
121    /// # Panics
122    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
123    pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
124        WebSocket { socket: stream, context: WebSocketContext::new(role, config) }
125    }
126
127    /// Convert a raw socket into a WebSocket without performing a handshake.
128    ///
129    /// Call this function if you're using Tungstenite as a part of a web framework
130    /// or together with an existing one. If you need an initial handshake, use
131    /// `connect()` or `accept()` functions of the crate to construct a websocket.
132    ///
133    /// # Panics
134    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
135    pub fn from_partially_read(
136        stream: Stream,
137        part: Vec<u8>,
138        role: Role,
139        config: Option<WebSocketConfig>,
140    ) -> Self {
141        WebSocket {
142            socket: stream,
143            context: WebSocketContext::from_partially_read(part, role, config),
144        }
145    }
146
147    /// Returns a shared reference to the inner stream.
148    pub fn get_ref(&self) -> &Stream {
149        &self.socket
150    }
151    /// Returns a mutable reference to the inner stream.
152    pub fn get_mut(&mut self) -> &mut Stream {
153        &mut self.socket
154    }
155
156    /// Change the configuration.
157    ///
158    /// # Panics
159    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
160    pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
161        self.context.set_config(set_func)
162    }
163
164    /// Read the configuration.
165    pub fn get_config(&self) -> &WebSocketConfig {
166        self.context.get_config()
167    }
168
169    /// Check if it is possible to read messages.
170    ///
171    /// Reading is impossible after receiving `Message::Close`. It is still possible after
172    /// sending close frame since the peer still may send some data before confirming close.
173    pub fn can_read(&self) -> bool {
174        self.context.can_read()
175    }
176
177    /// Check if it is possible to write messages.
178    ///
179    /// Writing gets impossible immediately after sending or receiving `Message::Close`.
180    pub fn can_write(&self) -> bool {
181        self.context.can_write()
182    }
183}
184
185impl<Stream: Read + Write> WebSocket<Stream> {
186    /// Read a message from stream, if possible.
187    ///
188    /// This will also queue responses to ping and close messages. These responses
189    /// will be written and flushed on the next call to [`read`](Self::read),
190    /// [`write`](Self::write) or [`flush`](Self::flush).
191    ///
192    /// # Closing the connection
193    /// When the remote endpoint decides to close the connection this will return
194    /// the close message with an optional close frame.
195    ///
196    /// You should continue calling [`read`](Self::read), [`write`](Self::write) or
197    /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
198    /// is returned. Once that happens it is safe to drop the underlying connection.
199    pub fn read(&mut self) -> Result<Message> {
200        self.context.read(&mut self.socket)
201    }
202
203    /// Writes and immediately flushes a message.
204    /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
205    pub fn send(&mut self, message: Message) -> Result<()> {
206        self.write(message)?;
207        self.flush()
208    }
209
210    /// Write a message to the provided stream, if possible.
211    ///
212    /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
213    ///
214    /// In the event of stream write failure the message frame will be stored
215    /// in the write buffer and will try again on the next call to [`write`](Self::write)
216    /// or [`flush`](Self::flush).
217    ///
218    /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
219    /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
220    ///
221    /// This call will generally not flush. However, if there are queued automatic messages
222    /// they will be written and eagerly flushed.
223    ///
224    /// For example, upon receiving ping messages tungstenite queues pong replies automatically.
225    /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
226    /// will write & flush the pong reply. This means you should not respond to ping frames manually.
227    ///
228    /// You can however send pong frames manually in order to indicate a unidirectional heartbeat
229    /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
230    /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
231    /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
232    /// ping will not be sent as it will be replaced by your custom pong message.
233    ///
234    /// # Errors
235    /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
236    ///   along with the equivalent passed message frame.
237    /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
238    /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
239    ///   [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
240    ///   error on your part.
241    /// - [`Error::Io`] is returned if the underlying connection returns an error
242    ///   (consider these fatal except for WouldBlock).
243    /// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
244    pub fn write(&mut self, message: Message) -> Result<()> {
245        self.context.write(&mut self.socket, message)
246    }
247
248    /// Flush writes.
249    ///
250    /// Ensures all messages previously passed to [`write`](Self::write) and automatic
251    /// queued pong responses are written & flushed into the underlying stream.
252    pub fn flush(&mut self) -> Result<()> {
253        self.context.flush(&mut self.socket)
254    }
255
256    /// Close the connection.
257    ///
258    /// This function guarantees that the close frame will be queued.
259    /// There is no need to call it again. Calling this function is
260    /// the same as calling `write(Message::Close(..))`.
261    ///
262    /// After queuing the close frame you should continue calling [`read`](Self::read) or
263    /// [`flush`](Self::flush) to drive the close handshake to completion.
264    ///
265    /// The websocket RFC defines that the underlying connection should be closed
266    /// by the server. Tungstenite takes care of this asymmetry for you.
267    ///
268    /// When the close handshake is finished (we have both sent and received
269    /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
270    /// [Error::ConnectionClosed] if this endpoint is the server.
271    ///
272    /// If this endpoint is a client, [Error::ConnectionClosed] will only be
273    /// returned after the server has closed the underlying connection.
274    ///
275    /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
276    /// is returned from [`read`](Self::read) or [`flush`](Self::flush).
277    pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
278        self.context.close(&mut self.socket, code)
279    }
280
281    /// Old name for [`read`](Self::read).
282    #[deprecated(note = "Use `read`")]
283    pub fn read_message(&mut self) -> Result<Message> {
284        self.read()
285    }
286
287    /// Old name for [`send`](Self::send).
288    #[deprecated(note = "Use `send`")]
289    pub fn write_message(&mut self, message: Message) -> Result<()> {
290        self.send(message)
291    }
292
293    /// Old name for [`flush`](Self::flush).
294    #[deprecated(note = "Use `flush`")]
295    pub fn write_pending(&mut self) -> Result<()> {
296        self.flush()
297    }
298}
299
300/// A context for managing WebSocket stream.
301#[derive(Debug)]
302pub struct WebSocketContext {
303    /// Server or client?
304    role: Role,
305    /// encoder/decoder of frame.
306    frame: FrameCodec,
307    /// The state of processing, either "active" or "closing".
308    state: WebSocketState,
309    /// Receive: an incomplete message being processed.
310    incomplete: Option<IncompleteMessage>,
311    /// Send in addition to regular messages E.g. "pong" or "close".
312    additional_send: Option<Frame>,
313    /// True indicates there is an additional message (like a pong)
314    /// that failed to flush previously and we should try again.
315    unflushed_additional: bool,
316    /// The configuration for the websocket session.
317    config: WebSocketConfig,
318}
319
320impl WebSocketContext {
321    /// Create a WebSocket context that manages a post-handshake stream.
322    ///
323    /// # Panics
324    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
325    pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
326        Self::_new(role, FrameCodec::new(), config.unwrap_or_default())
327    }
328
329    /// Create a WebSocket context that manages an post-handshake stream.
330    ///
331    /// # Panics
332    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
333    pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
334        Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default())
335    }
336
337    fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self {
338        config.assert_valid();
339        frame.set_max_out_buffer_len(config.max_write_buffer_size);
340        frame.set_out_buffer_write_len(config.write_buffer_size);
341        Self {
342            role,
343            frame,
344            state: WebSocketState::Active,
345            incomplete: None,
346            additional_send: None,
347            unflushed_additional: false,
348            config,
349        }
350    }
351
352    /// Change the configuration.
353    ///
354    /// # Panics
355    /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
356    pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
357        set_func(&mut self.config);
358        self.config.assert_valid();
359        self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
360        self.frame.set_out_buffer_write_len(self.config.write_buffer_size);
361    }
362
363    /// Read the configuration.
364    pub fn get_config(&self) -> &WebSocketConfig {
365        &self.config
366    }
367
368    /// Check if it is possible to read messages.
369    ///
370    /// Reading is impossible after receiving `Message::Close`. It is still possible after
371    /// sending close frame since the peer still may send some data before confirming close.
372    pub fn can_read(&self) -> bool {
373        self.state.can_read()
374    }
375
376    /// Check if it is possible to write messages.
377    ///
378    /// Writing gets impossible immediately after sending or receiving `Message::Close`.
379    pub fn can_write(&self) -> bool {
380        self.state.is_active()
381    }
382
383    /// Read a message from the provided stream, if possible.
384    ///
385    /// This function sends pong and close responses automatically.
386    /// However, it never blocks on write.
387    pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
388    where
389        Stream: Read + Write,
390    {
391        // Do not read from already closed connections.
392        self.state.check_not_terminated()?;
393
394        loop {
395            if self.additional_send.is_some() || self.unflushed_additional {
396                // Since we may get ping or close, we need to reply to the messages even during read.
397                match self.flush(stream) {
398                    Ok(_) => {}
399                    Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {
400                        // If blocked continue reading, but try again later
401                        self.unflushed_additional = true;
402                    }
403                    Err(err) => return Err(err),
404                }
405            } else if self.role == Role::Server && !self.state.can_read() {
406                self.state = WebSocketState::Terminated;
407                return Err(Error::ConnectionClosed);
408            }
409
410            // If we get here, either write blocks or we have nothing to write.
411            // Thus if read blocks, just let it return WouldBlock.
412            if let Some(message) = self.read_message_frame(stream)? {
413                trace!("Received message {}", message);
414                return Ok(message);
415            }
416        }
417    }
418
419    /// Write a message to the provided stream.
420    ///
421    /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
422    ///
423    /// In the event of stream write failure the message frame will be stored
424    /// in the write buffer and will try again on the next call to [`write`](Self::write)
425    /// or [`flush`](Self::flush).
426    ///
427    /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
428    /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
429    pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
430    where
431        Stream: Read + Write,
432    {
433        // When terminated, return AlreadyClosed.
434        self.state.check_not_terminated()?;
435
436        // Do not write after sending a close frame.
437        if !self.state.is_active() {
438            return Err(Error::Protocol(ProtocolError::SendAfterClosing));
439        }
440
441        let frame = match message {
442            Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true),
443            Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true),
444            Message::Ping(data) => Frame::ping(data),
445            Message::Pong(data) => {
446                self.set_additional(Frame::pong(data));
447                // Note: user pongs can be user flushed so no need to flush here
448                return self._write(stream, None).map(|_| ());
449            }
450            Message::Close(code) => return self.close(stream, code),
451            Message::Frame(f) => f,
452        };
453
454        let should_flush = self._write(stream, Some(frame))?;
455        if should_flush {
456            self.flush(stream)?;
457        }
458        Ok(())
459    }
460
461    /// Flush writes.
462    ///
463    /// Ensures all messages previously passed to [`write`](Self::write) and automatically
464    /// queued pong responses are written & flushed into the `stream`.
465    #[inline]
466    pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()>
467    where
468        Stream: Read + Write,
469    {
470        self._write(stream, None)?;
471        self.frame.write_out_buffer(stream)?;
472        stream.flush()?;
473        self.unflushed_additional = false;
474        Ok(())
475    }
476
477    /// Writes any data in the out_buffer, `additional_send` and given `data`.
478    ///
479    /// Does **not** flush.
480    ///
481    /// Returns true if the write contents indicate we should flush immediately.
482    fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
483    where
484        Stream: Read + Write,
485    {
486        if let Some(data) = data {
487            self.buffer_frame(stream, data)?;
488        }
489
490        // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
491        // response, unless it already received a Close frame. It SHOULD
492        // respond with Pong frame as soon as is practical. (RFC 6455)
493        let should_flush = if let Some(msg) = self.additional_send.take() {
494            trace!("Sending pong/close");
495            match self.buffer_frame(stream, msg) {
496                Err(Error::WriteBufferFull(Message::Frame(msg))) => {
497                    // if an system message would exceed the buffer put it back in
498                    // `additional_send` for retry. Otherwise returning this error
499                    // may not make sense to the user, e.g. calling `flush`.
500                    self.set_additional(msg);
501                    false
502                }
503                Err(err) => return Err(err),
504                Ok(_) => true,
505            }
506        } else {
507            self.unflushed_additional
508        };
509
510        // If we're closing and there is nothing to send anymore, we should close the connection.
511        if self.role == Role::Server && !self.state.can_read() {
512            // The underlying TCP connection, in most normal cases, SHOULD be closed
513            // first by the server, so that it holds the TIME_WAIT state and not the
514            // client (as this would prevent it from re-opening the connection for 2
515            // maximum segment lifetimes (2MSL), while there is no corresponding
516            // server impact as a TIME_WAIT connection is immediately reopened upon
517            // a new SYN with a higher seq number). (RFC 6455)
518            self.frame.write_out_buffer(stream)?;
519            self.state = WebSocketState::Terminated;
520            Err(Error::ConnectionClosed)
521        } else {
522            Ok(should_flush)
523        }
524    }
525
526    /// Close the connection.
527    ///
528    /// This function guarantees that the close frame will be queued.
529    /// There is no need to call it again. Calling this function is
530    /// the same as calling `send(Message::Close(..))`.
531    pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
532    where
533        Stream: Read + Write,
534    {
535        if let WebSocketState::Active = self.state {
536            self.state = WebSocketState::ClosedByUs;
537            let frame = Frame::close(code);
538            self._write(stream, Some(frame))?;
539        }
540        self.flush(stream)
541    }
542
543    /// Try to decode one message frame. May return None.
544    fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>>
545    where
546        Stream: Read + Write,
547    {
548        if let Some(mut frame) = self
549            .frame
550            .read_frame(stream, self.config.max_frame_size)
551            .check_connection_reset(self.state)?
552        {
553            if !self.state.can_read() {
554                return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
555            }
556            // MUST be 0 unless an extension is negotiated that defines meanings
557            // for non-zero values.  If a nonzero value is received and none of
558            // the negotiated extensions defines the meaning of such a nonzero
559            // value, the receiving endpoint MUST _Fail the WebSocket
560            // Connection_.
561            {
562                let hdr = frame.header();
563                if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
564                    return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
565                }
566            }
567
568            match self.role {
569                Role::Server => {
570                    if frame.is_masked() {
571                        // A server MUST remove masking for data frames received from a client
572                        // as described in Section 5.3. (RFC 6455)
573                        frame.apply_mask()
574                    } else if !self.config.accept_unmasked_frames {
575                        // The server MUST close the connection upon receiving a
576                        // frame that is not masked. (RFC 6455)
577                        // The only exception here is if the user explicitly accepts given
578                        // stream by setting WebSocketConfig.accept_unmasked_frames to true
579                        return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient));
580                    }
581                }
582                Role::Client => {
583                    if frame.is_masked() {
584                        // A client MUST close a connection if it detects a masked frame. (RFC 6455)
585                        return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
586                    }
587                }
588            }
589
590            match frame.header().opcode {
591                OpCode::Control(ctl) => {
592                    match ctl {
593                        // All control frames MUST have a payload length of 125 bytes or less
594                        // and MUST NOT be fragmented. (RFC 6455)
595                        _ if !frame.header().is_final => {
596                            Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
597                        }
598                        _ if frame.payload().len() > 125 => {
599                            Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
600                        }
601                        OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
602                        OpCtl::Reserved(i) => {
603                            Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
604                        }
605                        OpCtl::Ping => {
606                            let data = frame.into_data();
607                            // No ping processing after we sent a close frame.
608                            if self.state.is_active() {
609                                self.set_additional(Frame::pong(data.clone()));
610                            }
611                            Ok(Some(Message::Ping(data)))
612                        }
613                        OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))),
614                    }
615                }
616
617                OpCode::Data(data) => {
618                    let fin = frame.header().is_final;
619                    match data {
620                        OpData::Continue => {
621                            if let Some(ref mut msg) = self.incomplete {
622                                msg.extend(frame.into_data(), self.config.max_message_size)?;
623                            } else {
624                                return Err(Error::Protocol(
625                                    ProtocolError::UnexpectedContinueFrame,
626                                ));
627                            }
628                            if fin {
629                                Ok(Some(self.incomplete.take().unwrap().complete()?))
630                            } else {
631                                Ok(None)
632                            }
633                        }
634                        c if self.incomplete.is_some() => {
635                            Err(Error::Protocol(ProtocolError::ExpectedFragment(c)))
636                        }
637                        OpData::Text | OpData::Binary => {
638                            let msg = {
639                                let message_type = match data {
640                                    OpData::Text => IncompleteMessageType::Text,
641                                    OpData::Binary => IncompleteMessageType::Binary,
642                                    _ => panic!("Bug: message is not text nor binary"),
643                                };
644                                let mut m = IncompleteMessage::new(message_type);
645                                m.extend(frame.into_data(), self.config.max_message_size)?;
646                                m
647                            };
648                            if fin {
649                                Ok(Some(msg.complete()?))
650                            } else {
651                                self.incomplete = Some(msg);
652                                Ok(None)
653                            }
654                        }
655                        OpData::Reserved(i) => {
656                            Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i)))
657                        }
658                    }
659                }
660            } // match opcode
661        } else {
662            // Connection closed by peer
663            match replace(&mut self.state, WebSocketState::Terminated) {
664                WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
665                    Err(Error::ConnectionClosed)
666                }
667                _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
668            }
669        }
670    }
671
672    /// Received a close frame. Tells if we need to return a close frame to the user.
673    #[allow(clippy::option_option)]
674    fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
675        debug!("Received close frame: {:?}", close);
676        match self.state {
677            WebSocketState::Active => {
678                self.state = WebSocketState::ClosedByPeer;
679
680                let close = close.map(|frame| {
681                    if !frame.code.is_allowed() {
682                        CloseFrame {
683                            code: CloseCode::Protocol,
684                            reason: "Protocol violation".into(),
685                        }
686                    } else {
687                        frame
688                    }
689                });
690
691                let reply = Frame::close(close.clone());
692                debug!("Replying to close with {:?}", reply);
693                self.set_additional(reply);
694
695                Some(close)
696            }
697            WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
698                // It is already closed, just ignore.
699                None
700            }
701            WebSocketState::ClosedByUs => {
702                // We received a reply.
703                self.state = WebSocketState::CloseAcknowledged;
704                Some(close)
705            }
706            WebSocketState::Terminated => unreachable!(),
707        }
708    }
709
710    /// Write a single frame into the write-buffer.
711    fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
712    where
713        Stream: Read + Write,
714    {
715        match self.role {
716            Role::Server => {}
717            Role::Client => {
718                // 5.  If the data is being sent by the client, the frame(s) MUST be
719                // masked as defined in Section 5.3. (RFC 6455)
720                frame.set_random_mask();
721            }
722        }
723
724        trace!("Sending frame: {:?}", frame);
725        self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
726    }
727
728    /// Replace `additional_send` if it is currently a `Pong` message.
729    fn set_additional(&mut self, add: Frame) {
730        let empty_or_pong = self
731            .additional_send
732            .as_ref()
733            .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong));
734        if empty_or_pong {
735            self.additional_send.replace(add);
736        }
737    }
738}
739
740/// The current connection state.
741#[derive(Debug, PartialEq, Eq, Clone, Copy)]
742enum WebSocketState {
743    /// The connection is active.
744    Active,
745    /// We initiated a close handshake.
746    ClosedByUs,
747    /// The peer initiated a close handshake.
748    ClosedByPeer,
749    /// The peer replied to our close handshake.
750    CloseAcknowledged,
751    /// The connection does not exist anymore.
752    Terminated,
753}
754
755impl WebSocketState {
756    /// Tell if we're allowed to process normal messages.
757    fn is_active(self) -> bool {
758        matches!(self, WebSocketState::Active)
759    }
760
761    /// Tell if we should process incoming data. Note that if we send a close frame
762    /// but the remote hasn't confirmed, they might have sent data before they receive our
763    /// close frame, so we should still pass those to client code, hence ClosedByUs is valid.
764    fn can_read(self) -> bool {
765        matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs)
766    }
767
768    /// Check if the state is active, return error if not.
769    fn check_not_terminated(self) -> Result<()> {
770        match self {
771            WebSocketState::Terminated => Err(Error::AlreadyClosed),
772            _ => Ok(()),
773        }
774    }
775}
776
777/// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate.
778trait CheckConnectionReset {
779    fn check_connection_reset(self, state: WebSocketState) -> Self;
780}
781
782impl<T> CheckConnectionReset for Result<T> {
783    fn check_connection_reset(self, state: WebSocketState) -> Self {
784        match self {
785            Err(Error::Io(io_error)) => Err({
786                if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset {
787                    Error::ConnectionClosed
788                } else {
789                    Error::Io(io_error)
790                }
791            }),
792            x => x,
793        }
794    }
795}
796
797#[cfg(test)]
798mod tests {
799    use super::{Message, Role, WebSocket, WebSocketConfig};
800    use crate::error::{CapacityError, Error};
801
802    use std::{io, io::Cursor};
803
804    struct WriteMoc<Stream>(Stream);
805
806    impl<Stream> io::Write for WriteMoc<Stream> {
807        fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
808            Ok(buf.len())
809        }
810        fn flush(&mut self) -> io::Result<()> {
811            Ok(())
812        }
813    }
814
815    impl<Stream: io::Read> io::Read for WriteMoc<Stream> {
816        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
817            self.0.read(buf)
818        }
819    }
820
821    #[test]
822    fn receive_messages() {
823        let incoming = Cursor::new(vec![
824            0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
825            0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02,
826            0x03,
827        ]);
828        let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
829        assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2]));
830        assert_eq!(socket.read().unwrap(), Message::Pong(vec![3]));
831        assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into()));
832        assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
833    }
834
835    #[test]
836    fn size_limiting_text_fragmented() {
837        let incoming = Cursor::new(vec![
838            0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72,
839            0x6c, 0x64, 0x21,
840        ]);
841        let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() };
842        let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
843
844        assert!(matches!(
845            socket.read(),
846            Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
847        ));
848    }
849
850    #[test]
851    fn size_limiting_binary() {
852        let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]);
853        let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() };
854        let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
855
856        assert!(matches!(
857            socket.read(),
858            Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
859        ));
860    }
861}