tungstenite/protocol/mod.rs
1//! Generic WebSocket message stream.
2
3pub mod frame;
4
5mod message;
6
7pub use self::{frame::CloseFrame, message::Message};
8
9use self::{
10 frame::{
11 coding::{CloseCode, Control as OpCtl, Data as OpData, OpCode},
12 Frame, FrameCodec,
13 },
14 message::{IncompleteMessage, IncompleteMessageType},
15};
16use crate::error::{Error, ProtocolError, Result};
17use log::*;
18use std::{
19 io::{self, Read, Write},
20 mem::replace,
21};
22
23/// Indicates a Client or Server role of the websocket
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum Role {
26 /// This socket is a server
27 Server,
28 /// This socket is a client
29 Client,
30}
31
32/// The configuration for WebSocket connection.
33#[derive(Debug, Clone, Copy)]
34pub struct WebSocketConfig {
35 /// Does nothing, instead use `max_write_buffer_size`.
36 #[deprecated]
37 pub max_send_queue: Option<usize>,
38 /// The target minimum size of the write buffer to reach before writing the data
39 /// to the underlying stream.
40 /// The default value is 128 KiB.
41 ///
42 /// If set to `0` each message will be eagerly written to the underlying stream.
43 /// It is often more optimal to allow them to buffer a little, hence the default value.
44 ///
45 /// Note: [`flush`](WebSocket::flush) will always fully write the buffer regardless.
46 pub write_buffer_size: usize,
47 /// The max size of the write buffer in bytes. Setting this can provide backpressure
48 /// in the case the write buffer is filling up due to write errors.
49 /// The default value is unlimited.
50 ///
51 /// Note: The write buffer only builds up past [`write_buffer_size`](Self::write_buffer_size)
52 /// when writes to the underlying stream are failing. So the **write buffer can not
53 /// fill up if you are not observing write errors even if not flushing**.
54 ///
55 /// Note: Should always be at least [`write_buffer_size + 1 message`](Self::write_buffer_size)
56 /// and probably a little more depending on error handling strategy.
57 pub max_write_buffer_size: usize,
58 /// The maximum size of an incoming message. `None` means no size limit. The default value is 64 MiB
59 /// which should be reasonably big for all normal use-cases but small enough to prevent
60 /// memory eating by a malicious user.
61 pub max_message_size: Option<usize>,
62 /// The maximum size of a single incoming message frame. `None` means no size limit. The limit is for
63 /// frame payload NOT including the frame header. The default value is 16 MiB which should
64 /// be reasonably big for all normal use-cases but small enough to prevent memory eating
65 /// by a malicious user.
66 pub max_frame_size: Option<usize>,
67 /// When set to `true`, the server will accept and handle unmasked frames
68 /// from the client. According to the RFC 6455, the server must close the
69 /// connection to the client in such cases, however it seems like there are
70 /// some popular libraries that are sending unmasked frames, ignoring the RFC.
71 /// By default this option is set to `false`, i.e. according to RFC 6455.
72 pub accept_unmasked_frames: bool,
73}
74
75impl Default for WebSocketConfig {
76 fn default() -> Self {
77 #[allow(deprecated)]
78 WebSocketConfig {
79 max_send_queue: None,
80 write_buffer_size: 128 * 1024,
81 max_write_buffer_size: usize::MAX,
82 max_message_size: Some(64 << 20),
83 max_frame_size: Some(16 << 20),
84 accept_unmasked_frames: false,
85 }
86 }
87}
88
89impl WebSocketConfig {
90 /// Panic if values are invalid.
91 pub(crate) fn assert_valid(&self) {
92 assert!(
93 self.max_write_buffer_size > self.write_buffer_size,
94 "WebSocketConfig::max_write_buffer_size must be greater than write_buffer_size, \
95 see WebSocketConfig docs`"
96 );
97 }
98}
99
100/// WebSocket input-output stream.
101///
102/// This is THE structure you want to create to be able to speak the WebSocket protocol.
103/// It may be created by calling `connect`, `accept` or `client` functions.
104///
105/// Use [`WebSocket::read`], [`WebSocket::send`] to received and send messages.
106#[derive(Debug)]
107pub struct WebSocket<Stream> {
108 /// The underlying socket.
109 socket: Stream,
110 /// The context for managing a WebSocket.
111 context: WebSocketContext,
112}
113
114impl<Stream> WebSocket<Stream> {
115 /// Convert a raw socket into a WebSocket without performing a handshake.
116 ///
117 /// Call this function if you're using Tungstenite as a part of a web framework
118 /// or together with an existing one. If you need an initial handshake, use
119 /// `connect()` or `accept()` functions of the crate to construct a websocket.
120 ///
121 /// # Panics
122 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
123 pub fn from_raw_socket(stream: Stream, role: Role, config: Option<WebSocketConfig>) -> Self {
124 WebSocket { socket: stream, context: WebSocketContext::new(role, config) }
125 }
126
127 /// Convert a raw socket into a WebSocket without performing a handshake.
128 ///
129 /// Call this function if you're using Tungstenite as a part of a web framework
130 /// or together with an existing one. If you need an initial handshake, use
131 /// `connect()` or `accept()` functions of the crate to construct a websocket.
132 ///
133 /// # Panics
134 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
135 pub fn from_partially_read(
136 stream: Stream,
137 part: Vec<u8>,
138 role: Role,
139 config: Option<WebSocketConfig>,
140 ) -> Self {
141 WebSocket {
142 socket: stream,
143 context: WebSocketContext::from_partially_read(part, role, config),
144 }
145 }
146
147 /// Returns a shared reference to the inner stream.
148 pub fn get_ref(&self) -> &Stream {
149 &self.socket
150 }
151 /// Returns a mutable reference to the inner stream.
152 pub fn get_mut(&mut self) -> &mut Stream {
153 &mut self.socket
154 }
155
156 /// Change the configuration.
157 ///
158 /// # Panics
159 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
160 pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
161 self.context.set_config(set_func)
162 }
163
164 /// Read the configuration.
165 pub fn get_config(&self) -> &WebSocketConfig {
166 self.context.get_config()
167 }
168
169 /// Check if it is possible to read messages.
170 ///
171 /// Reading is impossible after receiving `Message::Close`. It is still possible after
172 /// sending close frame since the peer still may send some data before confirming close.
173 pub fn can_read(&self) -> bool {
174 self.context.can_read()
175 }
176
177 /// Check if it is possible to write messages.
178 ///
179 /// Writing gets impossible immediately after sending or receiving `Message::Close`.
180 pub fn can_write(&self) -> bool {
181 self.context.can_write()
182 }
183}
184
185impl<Stream: Read + Write> WebSocket<Stream> {
186 /// Read a message from stream, if possible.
187 ///
188 /// This will also queue responses to ping and close messages. These responses
189 /// will be written and flushed on the next call to [`read`](Self::read),
190 /// [`write`](Self::write) or [`flush`](Self::flush).
191 ///
192 /// # Closing the connection
193 /// When the remote endpoint decides to close the connection this will return
194 /// the close message with an optional close frame.
195 ///
196 /// You should continue calling [`read`](Self::read), [`write`](Self::write) or
197 /// [`flush`](Self::flush) to drive the reply to the close frame until [`Error::ConnectionClosed`]
198 /// is returned. Once that happens it is safe to drop the underlying connection.
199 pub fn read(&mut self) -> Result<Message> {
200 self.context.read(&mut self.socket)
201 }
202
203 /// Writes and immediately flushes a message.
204 /// Equivalent to calling [`write`](Self::write) then [`flush`](Self::flush).
205 pub fn send(&mut self, message: Message) -> Result<()> {
206 self.write(message)?;
207 self.flush()
208 }
209
210 /// Write a message to the provided stream, if possible.
211 ///
212 /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
213 ///
214 /// In the event of stream write failure the message frame will be stored
215 /// in the write buffer and will try again on the next call to [`write`](Self::write)
216 /// or [`flush`](Self::flush).
217 ///
218 /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
219 /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
220 ///
221 /// This call will generally not flush. However, if there are queued automatic messages
222 /// they will be written and eagerly flushed.
223 ///
224 /// For example, upon receiving ping messages tungstenite queues pong replies automatically.
225 /// The next call to [`read`](Self::read), [`write`](Self::write) or [`flush`](Self::flush)
226 /// will write & flush the pong reply. This means you should not respond to ping frames manually.
227 ///
228 /// You can however send pong frames manually in order to indicate a unidirectional heartbeat
229 /// as described in [RFC 6455](https://tools.ietf.org/html/rfc6455#section-5.5.3). Note that
230 /// if [`read`](Self::read) returns a ping, you should [`flush`](Self::flush) before passing
231 /// a custom pong to [`write`](Self::write), otherwise the automatic queued response to the
232 /// ping will not be sent as it will be replaced by your custom pong message.
233 ///
234 /// # Errors
235 /// - If the WebSocket's write buffer is full, [`Error::WriteBufferFull`] will be returned
236 /// along with the equivalent passed message frame.
237 /// - If the connection is closed and should be dropped, this will return [`Error::ConnectionClosed`].
238 /// - If you try again after [`Error::ConnectionClosed`] was returned either from here or from
239 /// [`read`](Self::read), [`Error::AlreadyClosed`] will be returned. This indicates a program
240 /// error on your part.
241 /// - [`Error::Io`] is returned if the underlying connection returns an error
242 /// (consider these fatal except for WouldBlock).
243 /// - [`Error::Capacity`] if your message size is bigger than the configured max message size.
244 pub fn write(&mut self, message: Message) -> Result<()> {
245 self.context.write(&mut self.socket, message)
246 }
247
248 /// Flush writes.
249 ///
250 /// Ensures all messages previously passed to [`write`](Self::write) and automatic
251 /// queued pong responses are written & flushed into the underlying stream.
252 pub fn flush(&mut self) -> Result<()> {
253 self.context.flush(&mut self.socket)
254 }
255
256 /// Close the connection.
257 ///
258 /// This function guarantees that the close frame will be queued.
259 /// There is no need to call it again. Calling this function is
260 /// the same as calling `write(Message::Close(..))`.
261 ///
262 /// After queuing the close frame you should continue calling [`read`](Self::read) or
263 /// [`flush`](Self::flush) to drive the close handshake to completion.
264 ///
265 /// The websocket RFC defines that the underlying connection should be closed
266 /// by the server. Tungstenite takes care of this asymmetry for you.
267 ///
268 /// When the close handshake is finished (we have both sent and received
269 /// a close message), [`read`](Self::read) or [`flush`](Self::flush) will return
270 /// [Error::ConnectionClosed] if this endpoint is the server.
271 ///
272 /// If this endpoint is a client, [Error::ConnectionClosed] will only be
273 /// returned after the server has closed the underlying connection.
274 ///
275 /// It is thus safe to drop the underlying connection as soon as [Error::ConnectionClosed]
276 /// is returned from [`read`](Self::read) or [`flush`](Self::flush).
277 pub fn close(&mut self, code: Option<CloseFrame>) -> Result<()> {
278 self.context.close(&mut self.socket, code)
279 }
280
281 /// Old name for [`read`](Self::read).
282 #[deprecated(note = "Use `read`")]
283 pub fn read_message(&mut self) -> Result<Message> {
284 self.read()
285 }
286
287 /// Old name for [`send`](Self::send).
288 #[deprecated(note = "Use `send`")]
289 pub fn write_message(&mut self, message: Message) -> Result<()> {
290 self.send(message)
291 }
292
293 /// Old name for [`flush`](Self::flush).
294 #[deprecated(note = "Use `flush`")]
295 pub fn write_pending(&mut self) -> Result<()> {
296 self.flush()
297 }
298}
299
300/// A context for managing WebSocket stream.
301#[derive(Debug)]
302pub struct WebSocketContext {
303 /// Server or client?
304 role: Role,
305 /// encoder/decoder of frame.
306 frame: FrameCodec,
307 /// The state of processing, either "active" or "closing".
308 state: WebSocketState,
309 /// Receive: an incomplete message being processed.
310 incomplete: Option<IncompleteMessage>,
311 /// Send in addition to regular messages E.g. "pong" or "close".
312 additional_send: Option<Frame>,
313 /// True indicates there is an additional message (like a pong)
314 /// that failed to flush previously and we should try again.
315 unflushed_additional: bool,
316 /// The configuration for the websocket session.
317 config: WebSocketConfig,
318}
319
320impl WebSocketContext {
321 /// Create a WebSocket context that manages a post-handshake stream.
322 ///
323 /// # Panics
324 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
325 pub fn new(role: Role, config: Option<WebSocketConfig>) -> Self {
326 Self::_new(role, FrameCodec::new(), config.unwrap_or_default())
327 }
328
329 /// Create a WebSocket context that manages an post-handshake stream.
330 ///
331 /// # Panics
332 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
333 pub fn from_partially_read(part: Vec<u8>, role: Role, config: Option<WebSocketConfig>) -> Self {
334 Self::_new(role, FrameCodec::from_partially_read(part), config.unwrap_or_default())
335 }
336
337 fn _new(role: Role, mut frame: FrameCodec, config: WebSocketConfig) -> Self {
338 config.assert_valid();
339 frame.set_max_out_buffer_len(config.max_write_buffer_size);
340 frame.set_out_buffer_write_len(config.write_buffer_size);
341 Self {
342 role,
343 frame,
344 state: WebSocketState::Active,
345 incomplete: None,
346 additional_send: None,
347 unflushed_additional: false,
348 config,
349 }
350 }
351
352 /// Change the configuration.
353 ///
354 /// # Panics
355 /// Panics if config is invalid e.g. `max_write_buffer_size <= write_buffer_size`.
356 pub fn set_config(&mut self, set_func: impl FnOnce(&mut WebSocketConfig)) {
357 set_func(&mut self.config);
358 self.config.assert_valid();
359 self.frame.set_max_out_buffer_len(self.config.max_write_buffer_size);
360 self.frame.set_out_buffer_write_len(self.config.write_buffer_size);
361 }
362
363 /// Read the configuration.
364 pub fn get_config(&self) -> &WebSocketConfig {
365 &self.config
366 }
367
368 /// Check if it is possible to read messages.
369 ///
370 /// Reading is impossible after receiving `Message::Close`. It is still possible after
371 /// sending close frame since the peer still may send some data before confirming close.
372 pub fn can_read(&self) -> bool {
373 self.state.can_read()
374 }
375
376 /// Check if it is possible to write messages.
377 ///
378 /// Writing gets impossible immediately after sending or receiving `Message::Close`.
379 pub fn can_write(&self) -> bool {
380 self.state.is_active()
381 }
382
383 /// Read a message from the provided stream, if possible.
384 ///
385 /// This function sends pong and close responses automatically.
386 /// However, it never blocks on write.
387 pub fn read<Stream>(&mut self, stream: &mut Stream) -> Result<Message>
388 where
389 Stream: Read + Write,
390 {
391 // Do not read from already closed connections.
392 self.state.check_not_terminated()?;
393
394 loop {
395 if self.additional_send.is_some() || self.unflushed_additional {
396 // Since we may get ping or close, we need to reply to the messages even during read.
397 match self.flush(stream) {
398 Ok(_) => {}
399 Err(Error::Io(err)) if err.kind() == io::ErrorKind::WouldBlock => {
400 // If blocked continue reading, but try again later
401 self.unflushed_additional = true;
402 }
403 Err(err) => return Err(err),
404 }
405 } else if self.role == Role::Server && !self.state.can_read() {
406 self.state = WebSocketState::Terminated;
407 return Err(Error::ConnectionClosed);
408 }
409
410 // If we get here, either write blocks or we have nothing to write.
411 // Thus if read blocks, just let it return WouldBlock.
412 if let Some(message) = self.read_message_frame(stream)? {
413 trace!("Received message {}", message);
414 return Ok(message);
415 }
416 }
417 }
418
419 /// Write a message to the provided stream.
420 ///
421 /// A subsequent call should be made to [`flush`](Self::flush) to flush writes.
422 ///
423 /// In the event of stream write failure the message frame will be stored
424 /// in the write buffer and will try again on the next call to [`write`](Self::write)
425 /// or [`flush`](Self::flush).
426 ///
427 /// If the write buffer would exceed the configured [`WebSocketConfig::max_write_buffer_size`]
428 /// [`Err(WriteBufferFull(msg_frame))`](Error::WriteBufferFull) is returned.
429 pub fn write<Stream>(&mut self, stream: &mut Stream, message: Message) -> Result<()>
430 where
431 Stream: Read + Write,
432 {
433 // When terminated, return AlreadyClosed.
434 self.state.check_not_terminated()?;
435
436 // Do not write after sending a close frame.
437 if !self.state.is_active() {
438 return Err(Error::Protocol(ProtocolError::SendAfterClosing));
439 }
440
441 let frame = match message {
442 Message::Text(data) => Frame::message(data.into(), OpCode::Data(OpData::Text), true),
443 Message::Binary(data) => Frame::message(data, OpCode::Data(OpData::Binary), true),
444 Message::Ping(data) => Frame::ping(data),
445 Message::Pong(data) => {
446 self.set_additional(Frame::pong(data));
447 // Note: user pongs can be user flushed so no need to flush here
448 return self._write(stream, None).map(|_| ());
449 }
450 Message::Close(code) => return self.close(stream, code),
451 Message::Frame(f) => f,
452 };
453
454 let should_flush = self._write(stream, Some(frame))?;
455 if should_flush {
456 self.flush(stream)?;
457 }
458 Ok(())
459 }
460
461 /// Flush writes.
462 ///
463 /// Ensures all messages previously passed to [`write`](Self::write) and automatically
464 /// queued pong responses are written & flushed into the `stream`.
465 #[inline]
466 pub fn flush<Stream>(&mut self, stream: &mut Stream) -> Result<()>
467 where
468 Stream: Read + Write,
469 {
470 self._write(stream, None)?;
471 self.frame.write_out_buffer(stream)?;
472 stream.flush()?;
473 self.unflushed_additional = false;
474 Ok(())
475 }
476
477 /// Writes any data in the out_buffer, `additional_send` and given `data`.
478 ///
479 /// Does **not** flush.
480 ///
481 /// Returns true if the write contents indicate we should flush immediately.
482 fn _write<Stream>(&mut self, stream: &mut Stream, data: Option<Frame>) -> Result<bool>
483 where
484 Stream: Read + Write,
485 {
486 if let Some(data) = data {
487 self.buffer_frame(stream, data)?;
488 }
489
490 // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
491 // response, unless it already received a Close frame. It SHOULD
492 // respond with Pong frame as soon as is practical. (RFC 6455)
493 let should_flush = if let Some(msg) = self.additional_send.take() {
494 trace!("Sending pong/close");
495 match self.buffer_frame(stream, msg) {
496 Err(Error::WriteBufferFull(Message::Frame(msg))) => {
497 // if an system message would exceed the buffer put it back in
498 // `additional_send` for retry. Otherwise returning this error
499 // may not make sense to the user, e.g. calling `flush`.
500 self.set_additional(msg);
501 false
502 }
503 Err(err) => return Err(err),
504 Ok(_) => true,
505 }
506 } else {
507 self.unflushed_additional
508 };
509
510 // If we're closing and there is nothing to send anymore, we should close the connection.
511 if self.role == Role::Server && !self.state.can_read() {
512 // The underlying TCP connection, in most normal cases, SHOULD be closed
513 // first by the server, so that it holds the TIME_WAIT state and not the
514 // client (as this would prevent it from re-opening the connection for 2
515 // maximum segment lifetimes (2MSL), while there is no corresponding
516 // server impact as a TIME_WAIT connection is immediately reopened upon
517 // a new SYN with a higher seq number). (RFC 6455)
518 self.frame.write_out_buffer(stream)?;
519 self.state = WebSocketState::Terminated;
520 Err(Error::ConnectionClosed)
521 } else {
522 Ok(should_flush)
523 }
524 }
525
526 /// Close the connection.
527 ///
528 /// This function guarantees that the close frame will be queued.
529 /// There is no need to call it again. Calling this function is
530 /// the same as calling `send(Message::Close(..))`.
531 pub fn close<Stream>(&mut self, stream: &mut Stream, code: Option<CloseFrame>) -> Result<()>
532 where
533 Stream: Read + Write,
534 {
535 if let WebSocketState::Active = self.state {
536 self.state = WebSocketState::ClosedByUs;
537 let frame = Frame::close(code);
538 self._write(stream, Some(frame))?;
539 }
540 self.flush(stream)
541 }
542
543 /// Try to decode one message frame. May return None.
544 fn read_message_frame<Stream>(&mut self, stream: &mut Stream) -> Result<Option<Message>>
545 where
546 Stream: Read + Write,
547 {
548 if let Some(mut frame) = self
549 .frame
550 .read_frame(stream, self.config.max_frame_size)
551 .check_connection_reset(self.state)?
552 {
553 if !self.state.can_read() {
554 return Err(Error::Protocol(ProtocolError::ReceivedAfterClosing));
555 }
556 // MUST be 0 unless an extension is negotiated that defines meanings
557 // for non-zero values. If a nonzero value is received and none of
558 // the negotiated extensions defines the meaning of such a nonzero
559 // value, the receiving endpoint MUST _Fail the WebSocket
560 // Connection_.
561 {
562 let hdr = frame.header();
563 if hdr.rsv1 || hdr.rsv2 || hdr.rsv3 {
564 return Err(Error::Protocol(ProtocolError::NonZeroReservedBits));
565 }
566 }
567
568 match self.role {
569 Role::Server => {
570 if frame.is_masked() {
571 // A server MUST remove masking for data frames received from a client
572 // as described in Section 5.3. (RFC 6455)
573 frame.apply_mask()
574 } else if !self.config.accept_unmasked_frames {
575 // The server MUST close the connection upon receiving a
576 // frame that is not masked. (RFC 6455)
577 // The only exception here is if the user explicitly accepts given
578 // stream by setting WebSocketConfig.accept_unmasked_frames to true
579 return Err(Error::Protocol(ProtocolError::UnmaskedFrameFromClient));
580 }
581 }
582 Role::Client => {
583 if frame.is_masked() {
584 // A client MUST close a connection if it detects a masked frame. (RFC 6455)
585 return Err(Error::Protocol(ProtocolError::MaskedFrameFromServer));
586 }
587 }
588 }
589
590 match frame.header().opcode {
591 OpCode::Control(ctl) => {
592 match ctl {
593 // All control frames MUST have a payload length of 125 bytes or less
594 // and MUST NOT be fragmented. (RFC 6455)
595 _ if !frame.header().is_final => {
596 Err(Error::Protocol(ProtocolError::FragmentedControlFrame))
597 }
598 _ if frame.payload().len() > 125 => {
599 Err(Error::Protocol(ProtocolError::ControlFrameTooBig))
600 }
601 OpCtl::Close => Ok(self.do_close(frame.into_close()?).map(Message::Close)),
602 OpCtl::Reserved(i) => {
603 Err(Error::Protocol(ProtocolError::UnknownControlFrameType(i)))
604 }
605 OpCtl::Ping => {
606 let data = frame.into_data();
607 // No ping processing after we sent a close frame.
608 if self.state.is_active() {
609 self.set_additional(Frame::pong(data.clone()));
610 }
611 Ok(Some(Message::Ping(data)))
612 }
613 OpCtl::Pong => Ok(Some(Message::Pong(frame.into_data()))),
614 }
615 }
616
617 OpCode::Data(data) => {
618 let fin = frame.header().is_final;
619 match data {
620 OpData::Continue => {
621 if let Some(ref mut msg) = self.incomplete {
622 msg.extend(frame.into_data(), self.config.max_message_size)?;
623 } else {
624 return Err(Error::Protocol(
625 ProtocolError::UnexpectedContinueFrame,
626 ));
627 }
628 if fin {
629 Ok(Some(self.incomplete.take().unwrap().complete()?))
630 } else {
631 Ok(None)
632 }
633 }
634 c if self.incomplete.is_some() => {
635 Err(Error::Protocol(ProtocolError::ExpectedFragment(c)))
636 }
637 OpData::Text | OpData::Binary => {
638 let msg = {
639 let message_type = match data {
640 OpData::Text => IncompleteMessageType::Text,
641 OpData::Binary => IncompleteMessageType::Binary,
642 _ => panic!("Bug: message is not text nor binary"),
643 };
644 let mut m = IncompleteMessage::new(message_type);
645 m.extend(frame.into_data(), self.config.max_message_size)?;
646 m
647 };
648 if fin {
649 Ok(Some(msg.complete()?))
650 } else {
651 self.incomplete = Some(msg);
652 Ok(None)
653 }
654 }
655 OpData::Reserved(i) => {
656 Err(Error::Protocol(ProtocolError::UnknownDataFrameType(i)))
657 }
658 }
659 }
660 } // match opcode
661 } else {
662 // Connection closed by peer
663 match replace(&mut self.state, WebSocketState::Terminated) {
664 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
665 Err(Error::ConnectionClosed)
666 }
667 _ => Err(Error::Protocol(ProtocolError::ResetWithoutClosingHandshake)),
668 }
669 }
670 }
671
672 /// Received a close frame. Tells if we need to return a close frame to the user.
673 #[allow(clippy::option_option)]
674 fn do_close<'t>(&mut self, close: Option<CloseFrame<'t>>) -> Option<Option<CloseFrame<'t>>> {
675 debug!("Received close frame: {:?}", close);
676 match self.state {
677 WebSocketState::Active => {
678 self.state = WebSocketState::ClosedByPeer;
679
680 let close = close.map(|frame| {
681 if !frame.code.is_allowed() {
682 CloseFrame {
683 code: CloseCode::Protocol,
684 reason: "Protocol violation".into(),
685 }
686 } else {
687 frame
688 }
689 });
690
691 let reply = Frame::close(close.clone());
692 debug!("Replying to close with {:?}", reply);
693 self.set_additional(reply);
694
695 Some(close)
696 }
697 WebSocketState::ClosedByPeer | WebSocketState::CloseAcknowledged => {
698 // It is already closed, just ignore.
699 None
700 }
701 WebSocketState::ClosedByUs => {
702 // We received a reply.
703 self.state = WebSocketState::CloseAcknowledged;
704 Some(close)
705 }
706 WebSocketState::Terminated => unreachable!(),
707 }
708 }
709
710 /// Write a single frame into the write-buffer.
711 fn buffer_frame<Stream>(&mut self, stream: &mut Stream, mut frame: Frame) -> Result<()>
712 where
713 Stream: Read + Write,
714 {
715 match self.role {
716 Role::Server => {}
717 Role::Client => {
718 // 5. If the data is being sent by the client, the frame(s) MUST be
719 // masked as defined in Section 5.3. (RFC 6455)
720 frame.set_random_mask();
721 }
722 }
723
724 trace!("Sending frame: {:?}", frame);
725 self.frame.buffer_frame(stream, frame).check_connection_reset(self.state)
726 }
727
728 /// Replace `additional_send` if it is currently a `Pong` message.
729 fn set_additional(&mut self, add: Frame) {
730 let empty_or_pong = self
731 .additional_send
732 .as_ref()
733 .map_or(true, |f| f.header().opcode == OpCode::Control(OpCtl::Pong));
734 if empty_or_pong {
735 self.additional_send.replace(add);
736 }
737 }
738}
739
740/// The current connection state.
741#[derive(Debug, PartialEq, Eq, Clone, Copy)]
742enum WebSocketState {
743 /// The connection is active.
744 Active,
745 /// We initiated a close handshake.
746 ClosedByUs,
747 /// The peer initiated a close handshake.
748 ClosedByPeer,
749 /// The peer replied to our close handshake.
750 CloseAcknowledged,
751 /// The connection does not exist anymore.
752 Terminated,
753}
754
755impl WebSocketState {
756 /// Tell if we're allowed to process normal messages.
757 fn is_active(self) -> bool {
758 matches!(self, WebSocketState::Active)
759 }
760
761 /// Tell if we should process incoming data. Note that if we send a close frame
762 /// but the remote hasn't confirmed, they might have sent data before they receive our
763 /// close frame, so we should still pass those to client code, hence ClosedByUs is valid.
764 fn can_read(self) -> bool {
765 matches!(self, WebSocketState::Active | WebSocketState::ClosedByUs)
766 }
767
768 /// Check if the state is active, return error if not.
769 fn check_not_terminated(self) -> Result<()> {
770 match self {
771 WebSocketState::Terminated => Err(Error::AlreadyClosed),
772 _ => Ok(()),
773 }
774 }
775}
776
777/// Translate "Connection reset by peer" into `ConnectionClosed` if appropriate.
778trait CheckConnectionReset {
779 fn check_connection_reset(self, state: WebSocketState) -> Self;
780}
781
782impl<T> CheckConnectionReset for Result<T> {
783 fn check_connection_reset(self, state: WebSocketState) -> Self {
784 match self {
785 Err(Error::Io(io_error)) => Err({
786 if !state.can_read() && io_error.kind() == io::ErrorKind::ConnectionReset {
787 Error::ConnectionClosed
788 } else {
789 Error::Io(io_error)
790 }
791 }),
792 x => x,
793 }
794 }
795}
796
797#[cfg(test)]
798mod tests {
799 use super::{Message, Role, WebSocket, WebSocketConfig};
800 use crate::error::{CapacityError, Error};
801
802 use std::{io, io::Cursor};
803
804 struct WriteMoc<Stream>(Stream);
805
806 impl<Stream> io::Write for WriteMoc<Stream> {
807 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
808 Ok(buf.len())
809 }
810 fn flush(&mut self) -> io::Result<()> {
811 Ok(())
812 }
813 }
814
815 impl<Stream: io::Read> io::Read for WriteMoc<Stream> {
816 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
817 self.0.read(buf)
818 }
819 }
820
821 #[test]
822 fn receive_messages() {
823 let incoming = Cursor::new(vec![
824 0x89, 0x02, 0x01, 0x02, 0x8a, 0x01, 0x03, 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f,
825 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x21, 0x82, 0x03, 0x01, 0x02,
826 0x03,
827 ]);
828 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, None);
829 assert_eq!(socket.read().unwrap(), Message::Ping(vec![1, 2]));
830 assert_eq!(socket.read().unwrap(), Message::Pong(vec![3]));
831 assert_eq!(socket.read().unwrap(), Message::Text("Hello, World!".into()));
832 assert_eq!(socket.read().unwrap(), Message::Binary(vec![0x01, 0x02, 0x03]));
833 }
834
835 #[test]
836 fn size_limiting_text_fragmented() {
837 let incoming = Cursor::new(vec![
838 0x01, 0x07, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x2c, 0x20, 0x80, 0x06, 0x57, 0x6f, 0x72,
839 0x6c, 0x64, 0x21,
840 ]);
841 let limit = WebSocketConfig { max_message_size: Some(10), ..WebSocketConfig::default() };
842 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
843
844 assert!(matches!(
845 socket.read(),
846 Err(Error::Capacity(CapacityError::MessageTooLong { size: 13, max_size: 10 }))
847 ));
848 }
849
850 #[test]
851 fn size_limiting_binary() {
852 let incoming = Cursor::new(vec![0x82, 0x03, 0x01, 0x02, 0x03]);
853 let limit = WebSocketConfig { max_message_size: Some(2), ..WebSocketConfig::default() };
854 let mut socket = WebSocket::from_raw_socket(WriteMoc(incoming), Role::Client, Some(limit));
855
856 assert!(matches!(
857 socket.read(),
858 Err(Error::Capacity(CapacityError::MessageTooLong { size: 3, max_size: 2 }))
859 ));
860 }
861}