1use core::sync::atomic::{AtomicUsize, Ordering};
2
3use alloc::{collections::VecDeque, string::String, sync::Arc};
4use kernel_user_link::file::BlockingMode;
5
6use crate::{
7 fs::{self, FileAccess, FileAttributes, FileNode, FileSystemError},
8 sync::spin::mutex::Mutex,
9};
10
11use super::Device;
12
13pub fn create_pipe_pair() -> (fs::File, fs::File) {
17 let pipe = Arc::new(Mutex::new(InnerPipe {
18 buffer: VecDeque::new(),
19 read_side_available: true,
20 write_side_available: true,
21 }));
22
23 let read_device = Arc::new(PipeSide {
24 inner: pipe.clone(),
25 is_read_side: true,
26 clones: AtomicUsize::new(1),
27 });
28 let write_device = Arc::new(PipeSide {
29 inner: pipe.clone(),
30 is_read_side: false,
31 clones: AtomicUsize::new(1),
32 });
33
34 let read_inode = FileNode::new_device(
35 String::from("read_pipe"),
36 FileAttributes::EMPTY,
37 read_device,
38 );
39 let write_inode = FileNode::new_device(
40 String::from("write_pipe"),
41 FileAttributes::EMPTY,
42 write_device,
43 );
44 let read_file = fs::File::from_inode(
45 read_inode,
46 String::from("read_pipe"),
47 fs::empty_filesystem(),
48 0,
49 BlockingMode::Block(1),
50 FileAccess::READ,
51 )
52 .expect("This is a file, shouldn't fail");
53 let write_file = fs::File::from_inode(
55 write_inode,
56 String::from("write_pipe"),
57 fs::empty_filesystem(),
58 0,
59 BlockingMode::None,
60 FileAccess::WRITE,
61 )
62 .expect("This is a file, shouldn't fail");
63
64 (read_file, write_file)
65}
66
67#[derive(Debug)]
69struct InnerPipe {
70 buffer: VecDeque<u8>,
72 read_side_available: bool,
73 write_side_available: bool,
74}
75
76#[derive(Debug)]
79pub struct PipeSide {
80 inner: Arc<Mutex<InnerPipe>>,
81 clones: AtomicUsize,
82 is_read_side: bool,
83}
84
85impl Device for PipeSide {
86 fn name(&self) -> &str {
87 "pipe"
88 }
89
90 fn read(&self, _offset: u64, buf: &mut [u8]) -> Result<u64, FileSystemError> {
91 if !self.is_read_side {
92 return Err(FileSystemError::ReadNotSupported);
93 }
94 let mut pipe = self.inner.lock();
95 if !pipe.write_side_available && pipe.buffer.is_empty() {
96 return Err(FileSystemError::EndOfFile);
97 }
98 let mut bytes_read = 0;
99 for byte in buf.iter_mut() {
100 if let Some(b) = pipe.buffer.pop_back() {
101 *byte = b;
102 bytes_read += 1;
103 } else {
104 break;
105 }
106 }
107 Ok(bytes_read)
108 }
109
110 fn write(&self, _offset: u64, buf: &[u8]) -> Result<u64, FileSystemError> {
111 if self.is_read_side {
112 return Err(FileSystemError::WriteNotSupported);
113 }
114 let mut pipe = self.inner.lock();
115 if !pipe.read_side_available {
116 return Err(FileSystemError::EndOfFile);
117 }
118 for byte in buf.iter() {
119 pipe.buffer.push_front(*byte);
120 }
121 Ok(buf.len() as u64)
122 }
123
124 fn close(&self) -> Result<(), FileSystemError> {
125 if self.clones.fetch_sub(1, Ordering::AcqRel) != 1 {
127 return Ok(());
128 }
129
130 let mut pipe = self.inner.lock();
131 if self.is_read_side {
132 pipe.read_side_available = false;
133 } else {
134 pipe.write_side_available = false;
135 }
136 Ok(())
137 }
138
139 fn clone_device(&self) -> Result<(), FileSystemError> {
140 self.clones.fetch_add(1, Ordering::AcqRel);
141 Ok(())
142 }
143}