Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix use of cc with make <4.3: Clear O_NONBLOCK after compilaton #966

Merged
merged 4 commits into from
Feb 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix use of cc with make: Clear O_NONBLOCK after compilaton
Since make <4.3 cannot handle jobserver pipe with `O_NONBLOCK` set.

Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Feb 23, 2024
commit 971c4cda61786d9037812589599b4b973a565cae
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ impl Build {
}

// Limit our parallelism globally with a jobserver.
let tokens = parallel::job_token::JobTokenServer::new();
let tokens = parallel::job_token::ActiveJobTokenServer::new()?;

// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
Expand Down
78 changes: 70 additions & 8 deletions src/parallel/job_token/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl Drop for JobToken {
}
}

pub(crate) enum JobTokenServer {
enum JobTokenServer {
Inherited(inherited_jobserver::JobServer),
InProcess(inprocess_jobserver::JobServer),
}
Expand All @@ -35,7 +35,7 @@ impl JobTokenServer {
/// present), we will create a global in-process only jobserver
/// that has to be static so that it will be shared by all cc
/// compilation.
pub(crate) fn new() -> &'static Self {
fn new() -> &'static Self {
static INIT: Once = Once::new();
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();

Expand All @@ -50,11 +50,35 @@ impl JobTokenServer {
&*JOBSERVER.as_ptr()
}
}
}

pub(crate) struct ActiveJobTokenServer(&'static JobTokenServer);

impl ActiveJobTokenServer {
pub(crate) fn new() -> Result<Self, Error> {
let jobserver = JobTokenServer::new();

#[cfg(unix)]
if let JobTokenServer::Inherited(inherited_jobserver) = &jobserver {
inherited_jobserver.enter_active()?;
}

Ok(Self(jobserver))
}

pub(crate) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
match self {
Self::Inherited(jobserver) => jobserver.try_acquire(),
Self::InProcess(jobserver) => Ok(jobserver.try_acquire()),
match &self.0 {
JobTokenServer::Inherited(jobserver) => jobserver.try_acquire(),
JobTokenServer::InProcess(jobserver) => Ok(jobserver.try_acquire()),
}
}
}

impl Drop for ActiveJobTokenServer {
fn drop(&mut self) {
#[cfg(unix)]
if let JobTokenServer::Inherited(inherited_jobserver) = &self.0 {
inherited_jobserver.exit_active();
}
}
}
Expand All @@ -64,9 +88,12 @@ mod inherited_jobserver {

use std::{
env::var_os,
sync::atomic::{
AtomicBool,
Ordering::{AcqRel, Acquire},
sync::{
atomic::{
AtomicBool,
Ordering::{AcqRel, Acquire},
},
Mutex, MutexGuard, PoisonError,
},
};

Expand All @@ -80,6 +107,10 @@ mod inherited_jobserver {
/// the end of the process.
global_implicit_token: AtomicBool,
inner: sys::JobServerClient,
/// number of active clients is required to know when it is safe to clear non-blocking
/// flags
#[cfg(unix)]
active_clients_cnt: Mutex<usize>,
}

impl JobServer {
Expand Down Expand Up @@ -117,9 +148,40 @@ mod inherited_jobserver {
.map(|inner| Self {
inner,
global_implicit_token: AtomicBool::new(true),
#[cfg(unix)]
active_clients_cnt: Mutex::new(0),
})
}

#[cfg(unix)]
fn get_locked_active_cnt(&self) -> MutexGuard<'_, usize> {
self.active_clients_cnt
.lock()
.unwrap_or_else(PoisonError::into_inner)
}

#[cfg(unix)]
pub(super) fn enter_active(&self) -> Result<(), Error> {
let mut active_cnt = self.get_locked_active_cnt();
if *active_cnt == 0 {
self.inner.prepare_for_acquires()?;
}

*active_cnt += 1;

Ok(())
}

#[cfg(unix)]
pub(super) fn exit_active(&self) {
let mut active_cnt = self.get_locked_active_cnt();
*active_cnt -= 1;

if *active_cnt == 0 {
self.inner.done_acquires();
}
}

pub(super) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
if !self.global_implicit_token.swap(false, AcqRel) {
// Cold path, no global implicit token, obtain one
Expand Down
23 changes: 18 additions & 5 deletions src/parallel/job_token/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
path::Path,
};

use crate::parallel::stderr::set_non_blocking;
use crate::parallel::stderr::{set_blocking, set_non_blocking};

pub(super) struct JobServerClient {
read: File,
Expand Down Expand Up @@ -77,10 +77,6 @@ impl JobServerClient {
let read = read.try_clone().ok()?;
let write = write.try_clone().ok()?;

// Set read and write end to nonblocking
set_non_blocking(&read).ok()?;
set_non_blocking(&write).ok()?;

Some(Self {
read,
write: Some(write),
Expand All @@ -90,6 +86,23 @@ impl JobServerClient {
}
}

pub(super) fn prepare_for_acquires(&self) -> Result<(), crate::Error> {
if let Some(write) = self.write.as_ref() {
set_non_blocking(&self.read)?;
set_non_blocking(write)?;
}

Ok(())
}

pub(super) fn done_acquires(&self) {
if let Some(write) = self.write.as_ref() {
let _ = set_blocking(&self.read);
let _ = set_blocking(write);
}
}

/// Must call `prepare_for_acquire` before using it.
pub(super) fn try_acquire(&self) -> io::Result<Option<()>> {
let mut fds = [libc::pollfd {
fd: self.read.as_raw_fd(),
Expand Down
40 changes: 31 additions & 9 deletions src/parallel/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,56 @@ use crate::{Error, ErrorKind};
compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature.");

#[cfg(unix)]
pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
// On Unix, switch the pipe to non-blocking mode.
// On Windows, we have a different way to be non-blocking.
let fd = pipe.as_raw_fd();
fn get_flags(fd: std::os::unix::io::RawFd) -> Result<i32, Error> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
if flags == -1 {
return Err(Error::new(
Err(Error::new(
ErrorKind::IOError,
format!(
"Failed to get flags for pipe {}: {}",
fd,
std::io::Error::last_os_error()
),
));
))
} else {
Ok(flags)
}
}

#[cfg(unix)]
fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result<(), Error> {
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } == -1 {
return Err(Error::new(
Err(Error::new(
ErrorKind::IOError,
format!(
"Failed to set flags for pipe {}: {}",
fd,
std::io::Error::last_os_error()
),
));
))
} else {
Ok(())
}
}

#[cfg(unix)]
pub fn set_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
// On Unix, switch the pipe to non-blocking mode.
// On Windows, we have a different way to be non-blocking.
let fd = pipe.as_raw_fd();

let flags = get_flags(fd)?;
set_flags(fd, flags & (!libc::O_NONBLOCK))
}

#[cfg(unix)]
pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
// On Unix, switch the pipe to non-blocking mode.
// On Windows, we have a different way to be non-blocking.
let fd = pipe.as_raw_fd();

Ok(())
let flags = get_flags(fd)?;
set_flags(fd, flags | libc::O_NONBLOCK)
}

pub fn bytes_available(stderr: &mut ChildStderr) -> Result<usize, Error> {
Expand Down