Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add udp multicast support #37

Merged
merged 3 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,3 @@ lto = true

[patch.crates-io]
crate_interface = { path = "crates/crate_interface" }

[patch."/~https://github.com/rcore-os/smoltcp.git"]
smoltcp = { git = "/~https://github.com/c0per/smoltcp", branch = "starryos" }
2 changes: 1 addition & 1 deletion api/axfeat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ default = []
monolithic = ["axruntime/monolithic", "axhal/monolithic", "dep:axprocess"]

# Signal
signal = ["axruntime/signal"]
signal = ["axruntime/signal", "axnet/signal"]

# Futex support
futex = ["axprocess/futex"]
Expand Down
6 changes: 4 additions & 2 deletions modules/axnet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ smoltcp = []
# 启用ip协议与否
ip = []

signal = []

default = ["smoltcp"]

[dependencies]
Expand All @@ -32,14 +34,14 @@ axio = { path = "../../crates/axio" }

[dependencies.smoltcp]
git = "/~https://github.com/rcore-os/smoltcp.git"
rev = "2ade274"
rev = "b7134a3"
default-features = false
features = [
"alloc", "log", # no std
"medium-ethernet",
"medium-ip",
"proto-ipv4",
"socket-raw", "socket-icmp", "socket-udp", "socket-tcp", "socket-dns",
"socket-raw", "socket-icmp", "socket-udp", "socket-tcp", "socket-dns", "proto-igmp",
# "fragmentation-buffer-size-65536", "proto-ipv4-fragmentation",
# "reassembly-buffer-size-65536", "reassembly-buffer-count-32",
# "assembler-max-segment-count-32",
Expand Down
4 changes: 3 additions & 1 deletion modules/axnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ cfg_if::cfg_if! {

pub use self::net_impl::TcpSocket;
pub use self::net_impl::UdpSocket;
pub use self::net_impl::{
add_membership, dns_query, from_core_sockaddr, into_core_sockaddr, poll_interfaces,
};
pub use self::net_impl::{bench_receive, bench_transmit};
pub use self::net_impl::{dns_query, from_core_sockaddr, into_core_sockaddr, poll_interfaces};
pub use smoltcp::time::Duration;
pub use smoltcp::wire::{IpAddress as IpAddr, IpEndpoint as SocketAddr, Ipv4Address as Ipv4Addr};

Expand Down
36 changes: 33 additions & 3 deletions modules/axnet/src/smoltcp_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod listen_table;
mod tcp;
mod udp;
use alloc::vec;
use axerrno::{AxError, AxResult};
use core::cell::RefCell;
use core::ops::DerefMut;

Expand All @@ -16,7 +17,7 @@ use driver_net::{DevError, NetBufPtr};
use lazy_init::LazyInit;
use smoltcp::iface::{Config, Interface, SocketHandle, SocketSet};
use smoltcp::phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken};
use smoltcp::socket::{self, AnySocket};
use smoltcp::socket::{self, AnySocket, Socket};
use smoltcp::time::Instant;
use smoltcp::wire::{EthernetAddress, HardwareAddress, IpAddress, IpCidr};

Expand Down Expand Up @@ -91,11 +92,11 @@ impl<'a> SocketSetWrapper<'a> {

pub fn new_udp_socket() -> socket::udp::Socket<'a> {
let udp_rx_buffer = socket::udp::PacketBuffer::new(
vec![socket::udp::PacketMetadata::EMPTY; 8],
vec![socket::udp::PacketMetadata::EMPTY; 256],
vec![0; UDP_RX_BUF_LEN],
);
let udp_tx_buffer = socket::udp::PacketBuffer::new(
vec![socket::udp::PacketMetadata::EMPTY; 8],
vec![socket::udp::PacketMetadata::EMPTY; 256],
vec![0; UDP_TX_BUF_LEN],
);
socket::udp::Socket::new(udp_rx_buffer, udp_tx_buffer)
Expand Down Expand Up @@ -130,6 +131,21 @@ impl<'a> SocketSetWrapper<'a> {
f(socket)
}

pub fn bind_check(&self, addr: IpAddress, _port: u16) -> AxResult {
let mut sockets = self.0.lock();
for item in sockets.iter_mut() {
match item.1 {
Socket::Udp(s) => {
if s.endpoint().addr == Some(addr) {
return Err(AxError::AddrInUse);
}
}
_ => continue,
};
}
Ok(())
}

pub fn poll_interfaces(&self) {
#[cfg(feature = "ip")]
{
Expand Down Expand Up @@ -334,6 +350,20 @@ pub fn bench_receive() {
ETH0.dev.lock().bench_receive_bandwidth();
}

/// Add multicast_addr to the loopback device.
pub fn add_membership(_multicast_addr: IpAddress, _interface_addr: IpAddress) {
#[cfg(feature = "ip")]
{
let timestamp =
Instant::from_micros_const((current_time_nanos() / NANOS_PER_MICROS) as i64);
let _ = LOOPBACK.lock().join_multicast_group(
LOOPBACK_DEV.lock().deref_mut(),
_multicast_addr,
timestamp,
);
}
}

pub(crate) fn init(_net_dev: AxNetDevice) {
#[cfg(feature = "ip")]
{
Expand Down
37 changes: 23 additions & 14 deletions modules/axnet/src/smoltcp_impl/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,19 +307,19 @@ impl TcpSocket {
let handle = unsafe { self.handle.get().read().unwrap() };
self.block_on(|| {
SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
if !socket.is_active() {
// not open
ax_err!(ConnectionRefused, "socket recv() failed")
} else if !socket.may_recv() {
// connection closed
Ok(0)
} else if socket.recv_queue() > 0 {
if socket.recv_queue() > 0 {
// data available
// TODO: use socket.recv(|buf| {...})
let len = socket
.recv_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?;
Ok(len)
} else if !socket.is_active() {
// not open
ax_err!(ConnectionRefused, "socket recv() failed")
} else if !socket.may_recv() {
// connection closed
Ok(0)
} else {
// no more data
Err(AxError::WouldBlock)
Expand All @@ -343,19 +343,19 @@ impl TcpSocket {
let handle = unsafe { self.handle.get().read().unwrap() };
self.block_on(|| {
SOCKET_SET.with_socket_mut::<tcp::Socket, _, _>(handle, |socket| {
if !socket.is_active() {
// not open
ax_err!(ConnectionRefused, "socket recv() failed")
} else if !socket.may_recv() {
// connection closed
Ok(0)
} else if socket.recv_queue() > 0 {
if socket.recv_queue() > 0 {
// data available
// TODO: use socket.recv(|buf| {...})
let len = socket
.recv_slice(buf)
.map_err(|_| ax_err_type!(BadState, "socket recv() failed"))?;
Ok(len)
} else if !socket.is_active() {
// not open
ax_err!(ConnectionRefused, "socket recv() failed")
} else if !socket.may_recv() {
// connection closed
Ok(0)
} else {
// no more data
if current_ticks() > expire_at {
Expand Down Expand Up @@ -609,6 +609,15 @@ impl TcpSocket {
f()
} else {
loop {
#[cfg(feature = "signal")]
unsafe {
extern "Rust" {
fn current_have_signal() -> bool;
}
if current_have_signal() {
return Err(AxError::Interrupted);
}
}
SOCKET_SET.poll_interfaces();
match f() {
Ok(t) => return Ok(t),
Expand Down
44 changes: 43 additions & 1 deletion modules/axnet/src/smoltcp_impl/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct UdpSocket {
local_addr: RwLock<Option<IpEndpoint>>,
peer_addr: RwLock<Option<IpEndpoint>>,
nonblock: AtomicBool,
reuse_addr: AtomicBool,
}

impl UdpSocket {
Expand All @@ -33,6 +34,7 @@ impl UdpSocket {
local_addr: RwLock::new(None),
peer_addr: RwLock::new(None),
nonblock: AtomicBool::new(false),
reuse_addr: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -70,6 +72,31 @@ impl UdpSocket {
self.nonblock.store(nonblocking, Ordering::Release);
}

/// Set the TTL (time-to-live) option for this socket.
///
/// The TTL is the number of hops that a packet is allowed to live.
pub fn set_socket_ttl(&self, ttl: u8) {
SOCKET_SET.with_socket_mut::<udp::Socket, _, _>(self.handle, |socket| {
socket.set_hop_limit(Some(ttl))
});
}

/// Returns whether this socket is in reuse address mode.
#[inline]
pub fn is_reuse_addr(&self) -> bool {
self.reuse_addr.load(Ordering::Acquire)
}

/// Moves this UDP socket into or out of reuse address mode.
///
/// When a socket is bound, the `SO_REUSEADDR` option allows multiple sockets to be bound to the
/// same address if they are bound to different local addresses. This option must be set before
/// calling `bind`.
#[inline]
pub fn set_reuse_addr(&self, reuse_addr: bool) {
self.reuse_addr.store(reuse_addr, Ordering::Release);
}

/// Binds an unbound socket to the given address and port.
///
/// It's must be called before [`send_to`](Self::send_to) and
Expand All @@ -89,6 +116,12 @@ impl UdpSocket {
addr: (!is_unspecified(local_endpoint.addr)).then_some(local_endpoint.addr),
port: local_endpoint.port,
};

if !self.is_reuse_addr() {
// Check if the address is already in use
SOCKET_SET.bind_check(local_endpoint.addr, local_endpoint.port)?;
}

SOCKET_SET.with_socket_mut::<udp::Socket, _, _>(self.handle, |socket| {
socket.bind(endpoint).or_else(|e| match e {
BindError::InvalidState => ax_err!(AlreadyExists, "socket bind() failed"),
Expand Down Expand Up @@ -190,11 +223,11 @@ impl UdpSocket {

/// Close the socket.
pub fn shutdown(&self) -> AxResult {
SOCKET_SET.poll_interfaces();
SOCKET_SET.with_socket_mut::<udp::Socket, _, _>(self.handle, |socket| {
debug!("UDP socket {}: shutting down", self.handle);
socket.close();
});
SOCKET_SET.poll_interfaces();
Ok(())
}

Expand Down Expand Up @@ -283,6 +316,15 @@ impl UdpSocket {
f()
} else {
loop {
#[cfg(feature = "signal")]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方目前暂时使用不安全的符号链接跳转,之后考虑 Starry 的单向依赖性质的时候可以把信号部分移植到底层,从而避免直接调用 Unsafe 代码。

unsafe {
extern "Rust" {
fn current_have_signal() -> bool;
}
if current_have_signal() {
return Err(AxError::Interrupted);
}
}
SOCKET_SET.poll_interfaces();
match f() {
Ok(t) => return Ok(t),
Expand Down
7 changes: 7 additions & 0 deletions modules/axprocess/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,10 @@ impl SignalCaller for SignalCallerImpl {
send_signal_to_thread(tid, signum).unwrap();
}
}

/// check if the current task has a signal pending
#[no_mangle]
#[cfg(feature = "signal")]
pub extern "Rust" fn current_have_signal() -> bool {
current_process().have_signals().is_some()
}
10 changes: 9 additions & 1 deletion ulib/axstarry/src/syscall_fs/ctype/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub struct EpollEvent {
pub event_type: EpollEventType,
/// 事件中使用到的数据,如fd等
pub data: u64,
pub fd: i32,
pub data_u32: u32,
pub data_u64: u64,
}

numeric_enum_macro::numeric_enum! {
Expand Down Expand Up @@ -105,7 +108,9 @@ impl EpollFile {
// 添加事件
EpollCtl::ADD => {
if inner.monitor_list.contains_key(&fd) {
return Err(SyscallError::EEXIST);
// return Err(SyscallError::EEXIST);
// TODO : fd close callback ?
inner.monitor_list.insert(fd, event);
}
inner.monitor_list.insert(fd, event);
}
Expand Down Expand Up @@ -181,6 +186,9 @@ impl EpollFile {
ret_events.push(EpollEvent {
event_type: EpollEventType::EPOLLERR,
data: req_event.data,
fd: -1,
data_u32: 0,
data_u64: 0,
});
}
}
Expand Down
3 changes: 3 additions & 0 deletions ulib/axstarry/src/syscall_fs/fs_syscall_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ numeric_enum_macro::numeric_enum! {
SELECT = 23,
PSELECT6 = 270,
READLINK = 89,
CHMOD = 90,
PREADLINKAT = 267,
FSTAT = 5,
LSTAT = 6,
Expand All @@ -124,5 +125,7 @@ numeric_enum_macro::numeric_enum! {
RENAMEAT = 264,
RENAMEAT2 = 316,
COPYFILERANGE = 326,
EPOLL_CREATE1 = 291,
EPOLL_PWAIT = 281,
}
}
4 changes: 4 additions & 0 deletions ulib/axstarry/src/syscall_fs/imp/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ pub fn syscall_epoll_wait(args: [usize; 6]) -> SyscallResult {
let max_event = max_event as usize;
let process = current_process();
let start: VirtAddr = (event as usize).into();
// FIXME: this is a temporary solution
// the memory will out of mapped memory if the max_event is too large
// maybe give the max_event a limit is a better solution
let max_event = core::cmp::min(max_event, 400);
let end = start + max_event * core::mem::size_of::<EpollEvent>();
if process.manual_alloc_range_for_lazy(start, end).is_err() {
return Err(SyscallError::EFAULT);
Expand Down
8 changes: 6 additions & 2 deletions ulib/axstarry/src/syscall_fs/imp/poll.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use axfs::api::FileIO;
use axhal::{mem::VirtAddr, time::current_ticks};
use axprocess::{current_process, yield_now_task};
use axsignal::signal_no::SignalNo;
use bitflags::bitflags;
extern crate alloc;
use crate::{SyscallError, SyscallResult, TimeSecs, TimeVal};
Expand Down Expand Up @@ -397,9 +398,12 @@ pub fn syscall_pselect6(args: [usize; 6]) -> SyscallResult {
if current_ticks() as usize > expire_time {
return Ok(0);
}
// TODO: fix this and use mask to ignore specific signal
#[cfg(feature = "signal")]
if process.have_signals().is_some() {
return Err(SyscallError::EINTR);
if let Some(signalno) = process.have_signals() {
if signalno == SignalNo::SIGKILL as usize {
return Err(SyscallError::EINTR);
}
}
}
}
6 changes: 6 additions & 0 deletions ulib/axstarry/src/syscall_fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,11 @@ pub fn fs_syscall(syscall_id: fs_syscall_id::FsSyscallId, args: [usize; 6]) -> S
READLINK => syscall_readlink(args),
#[cfg(target_arch = "x86_64")]
CREAT => Err(axerrno::LinuxError::EPERM),
#[cfg(target_arch = "x86_64")]
EPOLL_CREATE1 => unimplemented!("epoll_create1"),
#[cfg(target_arch = "x86_64")]
EPOLL_PWAIT => unimplemented!("epoll_ctl"),
#[cfg(target_arch = "x86_64")]
CHMOD => Ok(0),
}
}
Loading
Loading