Skip to content

Commit

Permalink
Lint fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuMoalic committed Mar 4, 2024
1 parent 444d142 commit d265701
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 24 deletions.
4 changes: 4 additions & 0 deletions node/amuman_node/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,26 @@

log = logging.getLogger("rich")


class JobPriority(Enum):
LOW = "LOW"
NORMAL = "NORMAL"
HIGH = "HIGH"


class JobStatus(Enum):
WAITING = "WAITING"
PENDING = "PENDING"
FINISHED = "FINISHED"
INTERRUPTED = "INTERRUPTED"


class GPUPartition(Enum):
SLOW = "SLOW"
NORMAL = "NORMAL"
FAST = "FAST"


@dataclass
class Job:
id: int
Expand Down
41 changes: 19 additions & 22 deletions node/amuman_node/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ async def run_job(self, job_id: int) -> None:

async def post_updated_job_to_manager(self) -> None:
url: str = f"http://{self.manager_url}/api/jobs/{self.job.id}/"
headers = {
"Authorization": f"Bearer {self.token}"
}
headers = {"Authorization": f"Bearer {self.token}"}
async with httpx.AsyncClient() as client:
try:
log.debug(f"Sending updated job to `{url}`")
Expand All @@ -37,18 +35,16 @@ async def post_updated_job_to_manager(self) -> None:
updated_job: Dict[str, Any] = response.json()
log.debug(f"Received updated job: {updated_job}")
else:
log.error(f"Error sending updated job: Status code {response.status_code}")
log.error(
f"Error sending updated job: Status code {response.status_code}"
)
except Exception as e:
log.exception(f"Error sending updated job to manager: {e}")

async def fetch_job_from_manager(self, job_id: int) -> Job:
url: str = f"http://{self.manager_url}/api/jobs/{job_id}/"
headers = {
"Authorization": f"Bearer {self.token}"
}
job_data: Dict[
str, Any
] = {}
headers = {"Authorization": f"Bearer {self.token}"}
job_data: Dict[str, Any] = {}
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
if response.status_code == 200:
Expand All @@ -65,8 +61,8 @@ async def fetch_job_from_manager(self, job_id: int) -> Job:

async def run_subprocess(self) -> None:
log.debug(f"job_starting subprocess for job ID: {self.job.id}")
cmd = ["amumax", self.job.path]

cmd = ["amumax", "-gpu=1", self.job.path]
log.debug(f"Running command: {cmd}")
try:
self.subprocess = await asyncio.create_subprocess_exec(
*cmd,
Expand All @@ -79,22 +75,25 @@ async def run_subprocess(self) -> None:
stdout, stderr = await self.subprocess.communicate()
if stdout:
log.debug(f"[STDOUT for job ID: {self.job.id}]\n{stdout.decode()}")
self.job.output=stdout.decode()
self.job.output = stdout.decode()
if stderr:
log.debug(f"[STDERR for job ID: {self.job.id}]\n{stderr.decode()}")
self.error=stderr.decode()
self.job.error = stderr.decode()

if self.subprocess.returncode == 0:
self.job.status = JobStatus.FINISHED
self.job.end_time=datetime.now().isoformat()
self.job.end_time = datetime.now().isoformat()
else:
self.job.status = JobStatus.INTERRUPTED
self.job.error_time=datetime.now().isoformat()
self.job.error_time = datetime.now().isoformat()

await self.post_updated_job_to_manager()

log.info(f"Job started AMUmax for job ID: {self.job.id} (PID: {self.subprocess.pid})")
log.info(
f"Job started AMUmax for job ID: {self.job.id} (PID: {self.subprocess.pid})"
)
log.info(f"AMUmax exited with status {self.job.status}.")
log.info(f"AMUmax output: {self.job.error}")

except OSError as e:
communicate = f"Failed to job_start subprocess for job ID: {self.job.id}. Executable may not be found. Error: {e}"
Expand All @@ -112,17 +111,15 @@ async def run_subprocess(self) -> None:
except asyncio.CancelledError:
log.info(f"Subprocess job_start was cancelled for job ID: {self.job.id}.")
except Exception as e:
communicate = f"Unexpected error occurred while job_starting subprocess for job ID: {self.job.id}. Error: {e}"
communicate = f"Unexpected error occurred while job_starting subprocess for job ID: {self.job.id}. Error: {e}"
log.error(communicate)
self.job.status = JobStatus.INTERRUPTED.value
self.job.status = JobStatus.INTERRUPTED
self.output = communicate
await self.post_updated_job_to_manager()

async def stop_process(self) -> None:
if self.subprocess and self.subprocess.returncode is None:
log.debug(f"Stopping amumax for job ID: {self.job.id}")
self.subprocess.terminate()
self.job.status = JobStatus.INTERRUPTED.value
self.job.status = JobStatus.INTERRUPTED
await self.post_updated_job_to_manager()


7 changes: 5 additions & 2 deletions node/amuman_node/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
log = logging.getLogger("rich")
logging.getLogger("websockets").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)


class NodeClient:
Expand All @@ -36,7 +37,7 @@ def __init__(self) -> None:
f"Manager URL: '{self.manager_url}', Node ID: {self.node_id}, Node Name: '{self.node_name}'"
)
self.reconnect_attempts: int = 10
self.reconnect_delay: int = 30
self.reconnect_delay: int = 30
self.gpm: Optional[GPUMonitor] = None
self.access_token: str
self.refresh_token: Optional[str] = None
Expand Down Expand Up @@ -180,7 +181,9 @@ async def process_message(self, message: str) -> None:
await self.execute_update_gpus(self.node_id)
elif command == "run_job":
log.info("Running job")
self.job_manager: JobManager = JobManager(self.node_id, self.manager_url, token=self.access_token)
self.job_manager: JobManager = JobManager(
self.node_id, self.manager_url, token=self.access_token
)
await self.job_manager.run_job(data["job_id"])
else:
log.error(f"Unknown command: {command}")
Expand Down

0 comments on commit d265701

Please sign in to comment.