kernel/devices/
pipe.rs

1use core::sync::atomic::{AtomicUsize, Ordering};
2
3use alloc::{collections::VecDeque, string::String, sync::Arc};
4use kernel_user_link::file::BlockingMode;
5
6use crate::{
7    fs::{self, FileAccess, FileAttributes, FileNode, FileSystemError},
8    sync::spin::mutex::Mutex,
9};
10
11use super::Device;
12
13/// Create a connected pipe pair.
14/// The first returned file is the read side of the pipe.
15/// The second returned file is the write side of the pipe.
16pub fn create_pipe_pair() -> (fs::File, fs::File) {
17    let pipe = Arc::new(Mutex::new(InnerPipe {
18        buffer: VecDeque::new(),
19        read_side_available: true,
20        write_side_available: true,
21    }));
22
23    let read_device = Arc::new(PipeSide {
24        inner: pipe.clone(),
25        is_read_side: true,
26        clones: AtomicUsize::new(1),
27    });
28    let write_device = Arc::new(PipeSide {
29        inner: pipe.clone(),
30        is_read_side: false,
31        clones: AtomicUsize::new(1),
32    });
33
34    let read_inode = FileNode::new_device(
35        String::from("read_pipe"),
36        FileAttributes::EMPTY,
37        read_device,
38    );
39    let write_inode = FileNode::new_device(
40        String::from("write_pipe"),
41        FileAttributes::EMPTY,
42        write_device,
43    );
44    let read_file = fs::File::from_inode(
45        read_inode,
46        String::from("read_pipe"),
47        fs::empty_filesystem(),
48        0,
49        BlockingMode::Block(1),
50        FileAccess::READ,
51    )
52    .expect("This is a file, shouldn't fail");
53    // no blocking for write
54    let write_file = fs::File::from_inode(
55        write_inode,
56        String::from("write_pipe"),
57        fs::empty_filesystem(),
58        0,
59        BlockingMode::None,
60        FileAccess::WRITE,
61    )
62    .expect("This is a file, shouldn't fail");
63
64    (read_file, write_file)
65}
66
67/// Pipe is a device that allows two processes to communicate with each other.
68#[derive(Debug)]
69struct InnerPipe {
70    /// The buffer of the pipe.
71    buffer: VecDeque<u8>,
72    read_side_available: bool,
73    write_side_available: bool,
74}
75
76/// Represent one side of a pipe.
77/// Check [`create_pipe_pair`] for more details.
78#[derive(Debug)]
79pub struct PipeSide {
80    inner: Arc<Mutex<InnerPipe>>,
81    clones: AtomicUsize,
82    is_read_side: bool,
83}
84
85impl Device for PipeSide {
86    fn name(&self) -> &str {
87        "pipe"
88    }
89
90    fn read(&self, _offset: u64, buf: &mut [u8]) -> Result<u64, FileSystemError> {
91        if !self.is_read_side {
92            return Err(FileSystemError::ReadNotSupported);
93        }
94        let mut pipe = self.inner.lock();
95        if !pipe.write_side_available && pipe.buffer.is_empty() {
96            return Err(FileSystemError::EndOfFile);
97        }
98        let mut bytes_read = 0;
99        for byte in buf.iter_mut() {
100            if let Some(b) = pipe.buffer.pop_back() {
101                *byte = b;
102                bytes_read += 1;
103            } else {
104                break;
105            }
106        }
107        Ok(bytes_read)
108    }
109
110    fn write(&self, _offset: u64, buf: &[u8]) -> Result<u64, FileSystemError> {
111        if self.is_read_side {
112            return Err(FileSystemError::WriteNotSupported);
113        }
114        let mut pipe = self.inner.lock();
115        if !pipe.read_side_available {
116            return Err(FileSystemError::EndOfFile);
117        }
118        for byte in buf.iter() {
119            pipe.buffer.push_front(*byte);
120        }
121        Ok(buf.len() as u64)
122    }
123
124    fn close(&self) -> Result<(), FileSystemError> {
125        // only close the pipe when all clones are closed
126        if self.clones.fetch_sub(1, Ordering::AcqRel) != 1 {
127            return Ok(());
128        }
129
130        let mut pipe = self.inner.lock();
131        if self.is_read_side {
132            pipe.read_side_available = false;
133        } else {
134            pipe.write_side_available = false;
135        }
136        Ok(())
137    }
138
139    fn clone_device(&self) -> Result<(), FileSystemError> {
140        self.clones.fetch_add(1, Ordering::AcqRel);
141        Ok(())
142    }
143}