Skip to content

Commit

Permalink
improved FTP && support remove file && optimize TCP server CPU usage
Browse files Browse the repository at this point in the history
  • Loading branch information
b23r0 committed Aug 17, 2022
1 parent 65567ef commit 43cc78c
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 133 deletions.
4 changes: 4 additions & 0 deletions heroinn/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,12 @@ lazy_static!{
let mut session = G_SHELL_SESSION.lock().unwrap();
session.gc();

log::info!("shell session : {}" , session.count());

let mut session = G_FTP_SESSION.lock().unwrap();
session.gc();

log::info!("ftp session : {}" , session.count());
}
});

Expand Down
4 changes: 4 additions & 0 deletions heroinn_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ fn main() {
std::thread::sleep(Duration::from_secs(HEART_BEAT_TIME));
let mut shell_session = shell_session_mgr_1.lock().unwrap();
let mut ftp_session = ftp_session_mgr_1.lock().unwrap();

log::info!("shell session : {}" , shell_session.count());
log::info!("ftp session : {}" , ftp_session.count());

shell_session.gc();
ftp_session.gc();
}
Expand Down
49 changes: 36 additions & 13 deletions heroinn_client/src/module/ftp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::{Arc, atomic::AtomicBool, mpsc::Sender};
use heroinn_util::{session::{Session, SessionBase, SessionPacket}, rpc::{RpcServer, RpcMessage}, ftp::method::{get_disk_info, get_folder_info, join_path}};
use heroinn_util::{session::{Session, SessionBase, SessionPacket}, rpc::{RpcServer, RpcMessage}, ftp::{method::*, FTPPacket, FTPId}};

pub struct FtpClient{
id : String,
Expand All @@ -15,6 +15,7 @@ impl Session for FtpClient{
rpc_server.register(&"get_disk_info".to_string(), get_disk_info);
rpc_server.register(&"get_folder_info".to_string(), get_folder_info);
rpc_server.register(&"join_path".to_string(), join_path);
rpc_server.register(&"remove_file".to_string(), remove_file);
Ok(Self{
id: id.clone(),
clientid: clientid.clone(),
Expand All @@ -33,18 +34,39 @@ impl Session for FtpClient{
}

fn write(&mut self, data : &Vec<u8>) -> std::io::Result<()> {
log::debug!("recv rpc call");
let msg = RpcMessage::parse(data)?;
let ret = self.rpc_server.call(&msg);
let packet = SessionPacket{
id: self.id.clone(),
data: ret.serialize()?,
};
log::debug!("call ret : {:?}" , ret);
if let Err(e) = self.sender.send(SessionBase { id: self.id.clone(), clientid: self.clientid.clone() , packet }){
log::error!("session sender error : {}", e );
self.closed.store(true, std::sync::atomic::Ordering::Relaxed);
};

let packet = FTPPacket::parse(data)?;

match packet.id(){
FTPId::RPC => {
log::debug!("recv rpc call");
let msg = RpcMessage::parse(&packet.data)?;
let ret = self.rpc_server.call(&msg);

let packet = FTPPacket{
id: FTPId::RPC.to_u8(),
data: ret.serialize()?,
};

let packet = SessionPacket{
id: self.id.clone(),
data: packet.serialize()?,
};
log::debug!("call ret : {:?}" , ret);
if let Err(e) = self.sender.send(SessionBase { id: self.id.clone(), clientid: self.clientid.clone() , packet }){
log::error!("session sender error : {}", e );
self.closed.store(true, std::sync::atomic::Ordering::Relaxed);
};
},
FTPId::Close => {
self.close();
},
FTPId::Unknown => {

},
}


Ok(())
}

Expand All @@ -53,6 +75,7 @@ impl Session for FtpClient{
}

fn close(&mut self) {
log::info!("ftp session closed");
self.closed.store(true, std::sync::atomic::Ordering::Relaxed);
}

Expand Down
31 changes: 26 additions & 5 deletions heroinn_core/src/module/ftp/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
pub mod ftp_port;
use std::env::current_dir;
use std::sync::{mpsc::Sender, atomic::AtomicBool, Arc};
use heroinn_util::ftp::{FTPPacket, FTPId};
use heroinn_util::{session::{Session, SessionBase, SessionPacket}};

use self::ftp_port::{FtpInstance, new_ftp};
pub struct FtpServer{
id : String,
clientid : String,
closed : Arc<AtomicBool>,
_sender : Sender<SessionBase>,
sender : Sender<SessionBase>,
instance : FtpInstance
}

Expand Down Expand Up @@ -82,8 +83,8 @@ impl Session for FtpServer{
Ok(Self{
id,
clientid: clientid.clone(),
closed: Arc::new(AtomicBool::new(true)),
_sender : sender,
closed,
sender,
instance: ftp,
})
}
Expand All @@ -98,11 +99,31 @@ impl Session for FtpServer{
}

fn alive(&self) -> bool {
self.closed.load(std::sync::atomic::Ordering::Relaxed)
!self.closed.load(std::sync::atomic::Ordering::Relaxed)
}

fn close(&mut self) {
self.closed.store(false, std::sync::atomic::Ordering::Relaxed)

let packet = SessionPacket{
id: self.id.clone(),
data: FTPPacket{
id: FTPId::Close.to_u8(),
data: vec![],
}.serialize().unwrap(),
};

match self.sender.send(SessionBase{
id: self.id.clone(),
clientid : self.clientid.clone(),
packet : packet
}){
Ok(_) => {},
Err(e) => {
log::info!("sender close msg error: {}" , e);
},
};
log::info!("ftp session closed");
self.closed.store(true, std::sync::atomic::Ordering::Relaxed)
}

fn clientid(&self) -> String {
Expand Down
6 changes: 6 additions & 0 deletions heroinn_util/src/ftp/method.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,10 @@ pub fn get_folder_info(param : Vec<String>) -> Result<Vec<String>>{

pub fn join_path(param : Vec<String>) -> Result<Vec<String>>{
Ok(vec![Path::new(&param[0]).join(&param[1]).absolutize().unwrap().to_str().unwrap().to_string()])
}

pub fn remove_file(param : Vec<String>) -> Result<Vec<String>>{
let filename = param[0].clone();
std::fs::remove_file(filename)?;
Ok(vec![])
}
42 changes: 32 additions & 10 deletions heroinn_util/src/ftp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,34 @@ pub mod method;
use serde::{Serialize, Deserialize};

pub enum FTPId{
GetDirectory,
Unknow
RPC,
Close,
Unknown
}

impl FTPId{
fn _to_u8(&self) -> u8{
pub fn to_u8(&self) -> u8{
match self{
FTPId::GetDirectory => 0x01,
FTPId::Unknow => 0xff,
FTPId::RPC => 0x01,
FTPId::Close => 0x02,
FTPId::Unknown => 0xff,

}
}

fn _from(id : u8) -> Self{
pub fn from(id : u8) -> Self{
match id{
0x01 => FTPId::GetDirectory,
_ => FTPId::Unknow
0x01 => FTPId::RPC,
0x02 => FTPId::Close,
_ => FTPId::Unknown
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FTPPacket{
id : u8,
data : String
pub id : u8,
pub data : Vec<u8>
}

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
Expand All @@ -38,6 +42,24 @@ pub struct FileInfo{
pub last_modified : String,
}

impl FTPPacket{
pub fn parse(data : &Vec<u8>) -> Result<Self>{
let ret : FTPPacket = serde_json::from_slice(data)?;
Ok(ret)
}

pub fn serialize(&self) -> Result<Vec<u8>>{
match serde_json::to_vec(self){
Ok(p) => Ok(p),
Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "serilize ftp packet faild"))
}
}

pub fn id(&self) -> FTPId{
FTPId::from(self.id)
}
}

impl FileInfo{
pub fn serialize(&self) -> Result<String>{
match serde_json::to_string(&self){
Expand Down
3 changes: 2 additions & 1 deletion heroinn_util/src/protocol/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl Server<TcpStream> for TcpServer{
std::thread::spawn(move || {

for stream in server.incoming(){
std::thread::sleep(std::time::Duration::from_millis(200));
let cb_data = cb_data.clone();
match stream {
Ok(s) => {
Expand Down Expand Up @@ -368,7 +369,7 @@ fn test_tcp_tunnel(){
client1.read_exact(&mut buf).unwrap();
assert!(buf == [5,6,7,8]);
}

client1.close();
server.close();
}
4 changes: 4 additions & 0 deletions heroinn_util/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,8 @@ impl<T : Session> SessionManager<T>{
self.sessions.remove(&i);
}
}

pub fn count(&self) -> usize{
self.sessions.len()
}
}
43 changes: 36 additions & 7 deletions th3rd/heroinn_ftp/src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use heroinn_util::{rpc::RpcMessage, ftp::{method::{get_disk_info, get_folder_info}, FileInfo}};
use heroinn_util::{rpc::RpcMessage, ftp::{method::{get_disk_info, get_folder_info, remove_file}, FileInfo, FTPPacket, FTPId}};
use std::{io::*, sync::mpsc::Sender};

use crate::G_RPCCLIENT;

fn build_ftp_rpc_packet(rpc_data : &RpcMessage) -> Result<FTPPacket>{
Ok(FTPPacket{
id: FTPId::RPC.to_u8(),
data: rpc_data.serialize()?,
})
}

pub fn get_remote_disk_info(sender : &Sender<RpcMessage>) -> Result<Vec<FileInfo>>{
pub fn get_remote_disk_info(sender : &Sender<FTPPacket>) -> Result<Vec<FileInfo>>{
let msg = RpcMessage::build_call("get_disk_info" , vec![]);
let mut remote_disk_info = vec![];
sender.send(msg.clone()).unwrap();
sender.send(build_ftp_rpc_packet(&msg)?).unwrap();
match G_RPCCLIENT.wait_msg(&msg.id, 10){
Ok(p) => {

Expand Down Expand Up @@ -45,10 +51,10 @@ pub fn get_local_disk_info() -> Result<Vec<FileInfo>>{
}
}

pub fn get_remote_folder_info(sender : &Sender<RpcMessage> , full_path : &String) -> Result<Vec<FileInfo>>{
pub fn get_remote_folder_info(sender : &Sender<FTPPacket> , full_path : &String) -> Result<Vec<FileInfo>>{
let msg = RpcMessage::build_call("get_folder_info" , vec![full_path.clone()]);
let mut remote_folder_info = vec![];
sender.send(msg.clone()).unwrap();
sender.send(build_ftp_rpc_packet(&msg)?).unwrap();
match G_RPCCLIENT.wait_msg(&msg.id, 10){
Ok(p) => {

Expand All @@ -69,9 +75,9 @@ pub fn get_remote_folder_info(sender : &Sender<RpcMessage> , full_path : &String
}
}

pub fn get_remote_join_path(sender : &Sender<RpcMessage> , cur_path : &String , filename : &String) -> Result<String>{
pub fn get_remote_join_path(sender : &Sender<FTPPacket> , cur_path : &String , filename : &String) -> Result<String>{
let msg = RpcMessage::build_call("join_path" , vec![cur_path.clone() , filename.clone()]);
sender.send(msg.clone()).unwrap();
sender.send(build_ftp_rpc_packet(&msg)?).unwrap();
match G_RPCCLIENT.wait_msg(&msg.id, 10){
Ok(p) => {

Expand Down Expand Up @@ -102,4 +108,27 @@ pub fn get_local_folder_info(full_path : &String) -> Result<Vec<FileInfo>>{
Err(e)
}
}
}

pub fn delete_local_file(full_path : &String) -> Result<()>{
remove_file(vec![full_path.clone()])?;
Ok(())
}

pub fn delete_remote_file(sender : &Sender<FTPPacket> ,full_path : &String) -> Result<()>{
let msg = RpcMessage::build_call("remove_file" , vec![full_path.clone()]);
sender.send(build_ftp_rpc_packet(&msg)?).unwrap();
match G_RPCCLIENT.wait_msg(&msg.id, 10){
Ok(p) => {

if p.retcode != 0{
return Err(std::io::Error::new(std::io::ErrorKind::Interrupted, p.msg));
}

Ok(())
}
Err(e) => {
Err(e)
}
}
}
Loading

0 comments on commit 43cc78c

Please sign in to comment.