diff --git a/Cargo.lock b/Cargo.lock index a47ecbd..057666d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,18 +2,179 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + [[package]] name = "autocfg" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "bytes" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "libc" +version = "0.2.162" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "mc-rust-server" version = "0.1.0" dependencies = [ "num-derive", "num-traits", + "tokio", + "tokio-stream", + "tokio-util", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys", ] [[package]] @@ -36,6 +197,50 @@ dependencies = [ "autocfg", ] +[[package]] +name = "object" +version = "0.36.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +dependencies = [ + "memchr", +] + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.89" @@ -54,6 +259,61 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags", +] + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "syn" version = "2.0.87" @@ -65,8 +325,144 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tokio" +version = "1.41.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "hashbrown", + "pin-project-lite", + "slab", + "tokio", +] + [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml index 082867a..923bdb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,3 +6,7 @@ edition = "2021" [dependencies] num-derive = "0.4.2" num-traits = "0.2.19" +tokio = { version = "1.41.1", features = ["rt", "rt-multi-thread", "macros", "full", "net"] } + +tokio-util = { version = "0.7.0", features = ["full"] } +tokio-stream = "0.1" diff --git a/src/main.rs b/src/main.rs index e33de33..bc5f7e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,47 +2,34 @@ pub mod protocols; pub mod types; pub mod utils; -use crate::protocols::handle; use crate::types::string::McString; use crate::types::var_int::VarInt; use crate::types::{McRead, McRustRepr}; use crate::utils::RWStreamWithLimit; use num_derive::FromPrimitive; use num_traits::{FromPrimitive, ToPrimitive}; -use std::io::{Read, Write}; -use std::net::{TcpListener, TcpStream}; -use std::thread; -use std::time::Duration; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::net::TcpStream; -fn main() { +#[tokio::main] +async fn main() -> Result<(), ()> { println!("Hello, world!"); - let listener = TcpListener::bind("127.0.0.1:25565").unwrap(); + // let listener = TcpListener::bind("127.0.0.1:25565").unwrap(); + let listener = tokio::net::TcpListener::bind("127.0.0.1:25565") + .await + .unwrap(); println!("Listening started."); - for stream in listener.incoming() { - match stream { - Ok(stream) => { - thread::spawn(|| { - println!("===============START====================="); + loop { + let (stream, socket) = listener.accept().await.map_err(|x| { + dbg!(x); + })?; - stream - .set_read_timeout(Some(Duration::from_secs(3))) - .unwrap(); - stream - .set_write_timeout(Some(Duration::from_secs(3))) - .unwrap(); - println!( - "Timeout for connection: {:?}/{:?}", - stream.read_timeout(), - stream.write_timeout() - ); - handle_connection(stream); - println!("===============DONE======================"); - }); - } - Err(err) => { - dbg!(err); - } - } + tokio::spawn(async move { + println!("===============START====================="); + dbg!(&socket); + handle_connection(stream).await; + println!("===============DONE======================"); + }); } } #[derive(FromPrimitive, Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] @@ -59,12 +46,11 @@ struct Connection { compression_active: bool, } impl Connection { - fn handle(&mut self) -> Result<(), String> { + async fn handle(&mut self) -> Result<(), String> { while self.connection_state != ConnectionState::Closed { - let x = self.tcp_stream.peek(&mut [0]); //see if we have at least one byte available + let x = self.tcp_stream.peek(&mut [0]).await; //see if we have at least one byte available match x { Ok(size) => { - println!("we should have 1 here: {size}"); if size == 0 { println!("Reached end of stream."); self.connection_state = ConnectionState::Closed; @@ -76,7 +62,7 @@ impl Connection { } } - let length = VarInt::read_stream(&mut self.tcp_stream)?; + let length = VarInt::read_stream(&mut self.tcp_stream).await?; println!("packet length: {}", length.as_rs()); let bytes_left_in_package = length.to_rs(); @@ -88,7 +74,8 @@ impl Connection { &mut package_stream, self.connection_state, self.compression_active, - ); + ) + .await; match result { Ok(new_connection_state) => { assert_eq!( @@ -99,10 +86,7 @@ impl Connection { self.connection_state = new_connection_state; } Err(e) => { - //discard rest of package for failed ones - discard_read(&mut self.tcp_stream, bytes_left_in_package.to_u8().unwrap()) - .map_err(|x| x.to_string())?; - + self.connection_state = ConnectionState::Closed; println!("Got an error during package handling: {e}"); } } @@ -110,26 +94,21 @@ impl Connection { Ok(()) } - fn handshake( + async fn handshake( stream: &mut T, _compression: bool, // bytes_left_in_package: &mut i32, ) -> Result { - // println!("bytes left:{}", bytes_left_in_package); - let protocol_version = VarInt::read_stream(stream)?; - // *bytes_left_in_package -= read as i32; + println!("Handshake"); + let protocol_version = VarInt::read_stream(stream).await?; println!("protocol version: {}", protocol_version.as_rs()); - // println!("bytes left:{}", bytes_left_in_package); - let address = - McString::read_stream(stream).map_err(|_| "Could not read string".to_string())?; - // *bytes_left_in_package -= read as i32; + let address: McString<255> = McString::read_stream(stream) + .await + .map_err(|_| "Could not read string".to_string())?; println!("address: '{}'", address.as_rs()); - stream.read_exact(&mut [0, 2]).unwrap(); //server port. Unused - // *bytes_left_in_package -= 2; - let next_state_id = VarInt::read_stream(stream)?; - // *bytes_left_in_package -= read as i32; + stream.read_exact(&mut [0, 2]).await.unwrap(); //server port. Unused + let next_state_id = VarInt::read_stream(stream).await?; println!("next state: {}", next_state_id.as_rs()); - // println!("bytes left:{}", bytes_left_in_package); let next_state = FromPrimitive::from_i32(next_state_id.to_rs()); match next_state { Some(next_state) => Ok(next_state), @@ -139,42 +118,36 @@ impl Connection { )), } } - fn handle_package( - stream: &mut RWStreamWithLimit, + async fn handle_package( + stream: &mut RWStreamWithLimit<'_, T>, connection_state: ConnectionState, compression: bool, - // bytes_left_in_package: usize, ) -> Result { - // let mut stream = RWStreamWithLimit::new(stream, bytes_left_in_package); - // let stream = &mut stream; - let packet_id = VarInt::read_stream(stream)?; - // *bytes_left_in_package = i32::max(*bytes_left_in_package - read as i32, 0); + let packet_id = VarInt::read_stream(stream).await?; println!("id: {:0>2x}", packet_id.as_rs()); if connection_state == ConnectionState::NotConnected && packet_id.to_rs() == 0x00 { - return Self::handshake(stream, compression); + return Self::handshake(stream, compression).await; } match FromPrimitive::from_i32(packet_id.to_rs()) { Some(protocol) => { - // println!("bytes left:{}", bytes_left_in_package); - let res = handle(protocol, stream); - // println!("bytes left:{}", bytes_left_in_package); + let res = protocols::handle(protocol, stream).await; match res { Ok(_) => { - // println!("bytes left:{}", bytes_left_in_package); println!("Success!"); } - Err(_) => { - stream.discard_unread().map_err(|x| x.to_string())?; - // println!("bytes left:{}", bytes_left_in_package); - // *bytes_left_in_package -= discard_read(stream, *bytes_left_in_package as u8) - // as i32; + Err(terminate_connection) => { + if terminate_connection { + return Err("Something terrible has happened!".to_string()); + } else { + stream.discard_unread().await.map_err(|x| x.to_string())?; + } println!("Failure :("); } } } None => { - stream.discard_unread().map_err(|x| x.to_string())?; + stream.discard_unread().await.map_err(|x| x.to_string())?; // *bytes_left_in_package -= discard_read(stream, *bytes_left_in_package as u8) // .map_err(|x| x.to_string())? as i32; println!("I don't know this protocol yet, so Im gonna ignore it..."); @@ -183,18 +156,14 @@ impl Connection { Ok(connection_state) } } -fn handle_connection(stream: TcpStream) { +async fn handle_connection(stream: TcpStream) { let mut connection = Connection { connection_state: ConnectionState::NotConnected, tcp_stream: stream, compression_active: false, }; - let result = connection.handle(); + let result = connection.handle().await; if let Err(e) = result { dbg!(e); } } -fn discard_read(stream: &mut T, bytes: u8) -> Result { - stream.read_exact(&mut [0, bytes])?; - Ok(bytes as usize) -} diff --git a/src/protocols.rs b/src/protocols.rs index 7a8a272..7df1155 100644 --- a/src/protocols.rs +++ b/src/protocols.rs @@ -1,23 +1,24 @@ -use crate::protocols::status::StatusProtocol; use crate::utils::RWStreamWithLimit; use num_derive::FromPrimitive; -use std::io::{Read, Write}; -// use num_traits::FromPrimitive; +use tokio::io::{AsyncRead, AsyncWrite}; #[derive(FromPrimitive)] pub enum Protocols { Status = 0x00, Ping = 0x01, + CustomReportDetails = 0x7a, } -pub fn handle( +pub async fn handle( protocol: Protocols, - stream: &mut RWStreamWithLimit, + stream: &mut RWStreamWithLimit<'_, T>, // bytes_left_in_package: &mut i32, -) -> Result<(), ()> { +) -> Result<(), bool> { match protocol { - Protocols::Status => StatusProtocol::handle(stream)?, + Protocols::Status => status::Protocol::handle(stream).await?, Protocols::Ping => {} + Protocols::CustomReportDetails => custom_report_details::Protocol::handle(stream).await?, }; Ok(()) } +mod custom_report_details; mod status; diff --git a/src/protocols/custom_report_details.rs b/src/protocols/custom_report_details.rs new file mode 100644 index 0000000..ea7bd28 --- /dev/null +++ b/src/protocols/custom_report_details.rs @@ -0,0 +1,42 @@ +use crate::types::var_int::VarInt; +use crate::types::McRead; +use crate::utils::RWStreamWithLimit; +use tokio::io::{AsyncRead, AsyncWrite}; + +pub struct Protocol {} + +impl Protocol { + pub async fn handle( + stream: &mut RWStreamWithLimit<'_, T>, + // bytes_left_in_package: &mut i32, + ) -> Result<(), bool> { + let count = VarInt::read_stream(stream).await.map_err(|x| { + dbg!(x); + true + })?; + dbg!(&count); + let string_size = VarInt::read_stream(stream).await.map_err(|x| { + dbg!(x); + true + })?; + dbg!(&string_size); + stream.discard_unread().await.map_err(|x| { + dbg!(x); + true + })?; + // for i in 0..*count { + // let title = McString::read_stream(stream).await.map_err(|x| { + // dbg!(x); + // })?; + // let description = McString::read_stream(stream).await.map_err(|x| { + // dbg!(x); + // })?; + // println!( + // "Read title & description fo some custom report ({i}): {}\n{}", + // title.as_rs(), + // description.as_rs() + // ); + // } + Ok(()) + } +} diff --git a/src/protocols/status.rs b/src/protocols/status.rs index 39c2f72..6404cf6 100644 --- a/src/protocols/status.rs +++ b/src/protocols/status.rs @@ -1,23 +1,33 @@ use crate::types::string::McString; +use crate::types::var_int::VarInt; use crate::types::McWrite; use crate::utils::RWStreamWithLimit; -use std::io::{Read, Write}; +use tokio::io::{AsyncRead, AsyncWrite}; -pub struct StatusProtocol {} +pub struct Protocol {} -impl StatusProtocol { - pub fn handle( - stream: &mut RWStreamWithLimit, +impl Protocol { + pub async fn handle( + stream: &mut RWStreamWithLimit<'_, T>, // bytes_left_in_package: &mut i32, - ) -> Result<(), ()> { - McString(Self::get_sample_result()) + ) -> Result<(), bool> { + println!("Status"); + VarInt(0x01).write_stream(stream).await.map_err(|x| { + dbg!(x); + false + })?; + + McString::<32767>::from_string(Self::get_sample_result()) .write_stream(stream) + .await .map_err(|x| { dbg!(x); + false })?; - stream.discard_unread().map_err(|x| { - dbg!(x); - })?; + // stream.discard_unread().await.map_err(|x| { + // dbg!(x); + // false + // })?; // *bytes_left_in_package = 0; Ok(()) } diff --git a/src/types.rs b/src/types.rs index 35d42ff..5d95333 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,14 +1,17 @@ -use std::io::{Read, Write}; +use tokio::io::{AsyncRead, AsyncWrite}; -pub trait McRead { +pub(crate) trait McRead { type Error; - fn read_stream(stream: &mut T) -> Result + async fn read_stream(stream: &mut T) -> Result where Self: Sized; } -pub trait McWrite { +pub(crate) trait McWrite { type Error; - fn write_stream(&self, stream: &mut T) -> Result + async fn write_stream( + &self, + stream: &mut T, + ) -> Result where Self: Sized; } diff --git a/src/types/string.rs b/src/types/string.rs index 7ffa644..f261259 100644 --- a/src/types/string.rs +++ b/src/types/string.rs @@ -1,58 +1,77 @@ use crate::types::var_int::VarInt; use crate::types::{McRead, McRustRepr, McWrite}; -use std::io::{Read, Write}; -pub struct McString(pub String); -impl McRead for McString { +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +pub struct McString { + pub value: String, +} +impl McString { + fn measure_size(s: &str) -> usize { + s.len() + } + pub fn from_string(s: String) -> Self { + Self { value: s } + } +} +impl McRead for McString { type Error = (); - fn read_stream(b: &mut T) -> Result + async fn read_stream(b: &mut T) -> Result where Self: Sized, { - let size = VarInt::read_stream(b).map_err(|x| { + let max_size = VarInt::read_stream(b).await.map_err(|x| { dbg!(x); })?; - let size = *size as usize; + let size = *max_size as usize; + + // Check if the size exceeds the maximum allowed length (n) + if size > (MAX_SIZE * 3) + 3 { + return Err(()); // Or a more specific error type + } let mut bytes = vec![0u8; size]; - let actual_size = b.read(&mut bytes).map_err(|x| { + let actual_size = b.read(&mut bytes).await.map_err(|x| { dbg!(x); })?; assert_eq!(size, actual_size); let value = String::from_utf8(bytes).map_err(|x| { dbg!(x); })?; - Ok(Self(value)) + Ok(Self { value }) } } -impl McWrite for McString { +impl McWrite for McString { type Error = std::io::Error; - fn write_stream(&self, stream: &mut T) -> Result + async fn write_stream( + &self, + stream: &mut T, + ) -> Result where Self: Sized, { - let buf = self.0.as_bytes(); - let length = buf.len(); //This does not actually count right (see https://wiki.vg/Protocol#Type:String) - VarInt(length as i32).write_stream(stream)?; + let buf = self.value.as_bytes(); + let length = Self::measure_size(&self.value); + VarInt(length as i32).write_stream(stream).await?; - stream.write_all(buf)?; + stream.write_all(buf).await?; Ok(length) } } -impl McRustRepr for McString { +impl McRustRepr for McString { type RustRepresentation = String; fn into_rs(self) -> Self::RustRepresentation { - self.0 + self.value } fn to_rs(&self) -> Self::RustRepresentation { - self.0.to_owned() + self.value.to_owned() } fn as_rs(&self) -> &Self::RustRepresentation { - &self.0 + &self.value } } diff --git a/src/types/var_int.rs b/src/types/var_int.rs index 6a7ba61..76093a3 100644 --- a/src/types/var_int.rs +++ b/src/types/var_int.rs @@ -1,6 +1,7 @@ use crate::types::{McRead, McRustRepr, McWrite}; -use std::io::{Read, Write}; + use std::ops::Deref; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; #[derive(Debug, Copy, Clone)] pub struct VarInt(pub i32); @@ -14,27 +15,30 @@ impl Deref for VarInt { impl McWrite for VarInt { type Error = std::io::Error; - fn write_stream(&self, stream: &mut T) -> Result + async fn write_stream( + &self, + stream: &mut T, + ) -> Result where Self: Sized, { let mut value = self.0 as u32; loop { if (value & Self::SEGMENT_BITS as u32) == 0 { - let _ = stream.write(&[value.to_le_bytes()[0]])?; + let _ = stream.write(&[value.to_le_bytes()[0]]).await?; return Ok(1); } let x = value & Self::SEGMENT_BITS as u32 | Self::CONTINUE_BIT as u32; let x = x.to_le_bytes()[0]; - let _ = stream.write(&[x])?; + let _ = stream.write(&[x]).await?; value >>= 7; } } } impl McRead for VarInt { type Error = String; - fn read_stream(b: &mut T) -> Result { + async fn read_stream(b: &mut T) -> Result { let mut value = 0i32; let mut position = 0; // println!("CONTINUE bit: {:0>32b}", Self::CONTINUE_BIT); @@ -43,6 +47,7 @@ impl McRead for VarInt { loop { let mut current_byte = 0u8; b.read_exact(std::slice::from_mut(&mut current_byte)) + .await .map_err(|x| x.to_string())?; // println!( // "b: {:0>32b}\nm: {:0>32b}\nr: {:0>32b}\n>: {:0>32b} ({position})\nv: {:0>32b}\nr2:{:0>32b}", diff --git a/src/utils.rs b/src/utils.rs index 550ab23..b1dc1f0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,22 +1,44 @@ -use std::io::{ErrorKind, Read, Write}; +use std::io::ErrorKind; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; -pub struct RWStreamWithLimit<'a, T: Read + Write> { +pub struct RWStreamWithLimit<'a, T: AsyncRead + AsyncWrite> { stream: &'a mut T, read_bytes_left: usize, } -impl<'a, T: Read + Write> RWStreamWithLimit<'a, T> { + +impl<'a, T: AsyncRead + AsyncWrite + Unpin> RWStreamWithLimit<'a, T> { pub(crate) fn new(stream: &'a mut T, read_limit: usize) -> Self { Self { stream, read_bytes_left: read_limit, } } - pub(crate) fn discard_unread(&mut self) -> std::io::Result { + pub(crate) async fn discard_unread(&mut self) -> std::io::Result { let mut total_read = 0; while self.read_bytes_left > 0 { - let read = self.stream.read(&mut vec![0; self.read_bytes_left])?; + println!("Discarding {} bytes...", self.read_bytes_left); + let read = self.stream.read(&mut vec![0; self.read_bytes_left]).await?; total_read += read; self.read_bytes_left -= read; + println!( + "Discarded {read}/{} remaining bytes ({total_read}/{} total)", + self.read_bytes_left + read, + self.read_bytes_left + total_read + ); + if read == 0 { + const ERROR: &str = "Could not read a single byte"; + println!("{}", ERROR); + return Err(std::io::Error::new(ErrorKind::Other, ERROR)); + } + if self.read_bytes_left > 0 { + //IDK if this makes sense to just throw an error if we don't read all in one go? + const ERROR: &str = "Couldnt read all bytes in one go"; + println!("{}", ERROR); + return Err(std::io::Error::new(ErrorKind::Other, ERROR)); + } + println!("Done Discarding {total_read} bytes"); } Ok(total_read) } @@ -24,35 +46,59 @@ impl<'a, T: Read + Write> RWStreamWithLimit<'a, T> { self.read_bytes_left } } -impl<'a, T: Read + Write> Read for RWStreamWithLimit<'a, T> { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let bytes_read; - if self.read_bytes_left > 0 { - if self.read_bytes_left >= buf.len() { - bytes_read = self.stream.read(buf)?; - } else { - println!("wants to read more than in the readable part of the stream"); - bytes_read = self.stream.read(&mut buf[0..self.read_bytes_left])?; - //TODO: decide if we wanna throw an error here or nah - } - self.read_bytes_left -= bytes_read; //TODO: maybe check if we read to much? - } else { - return Err(std::io::Error::new( +impl<'a, T: AsyncRead + AsyncWrite + Unpin> AsyncRead for RWStreamWithLimit<'a, T> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if self.read_bytes_left == 0 { + return Poll::Ready(Err(std::io::Error::new( ErrorKind::Other, "There is nothing more to read in this package", - )); - //TODO: maybe throw an error since there is no way anything gets read anymore? + ))); + } + + let self_mut = self.get_mut(); + let stream = &mut self_mut.stream; + + if self_mut.read_bytes_left < buf.remaining() { + println!("wants to read more than in the readable part of the stream. Only read readable part, to not screw up the next few parts"); + } + let bytes_to_read = std::cmp::min(self_mut.read_bytes_left, buf.remaining()); + let mut inner_buf = buf.take(bytes_to_read); + + let read = Pin::new(stream).poll_read(cx, &mut inner_buf); + if let Poll::Ready(Ok(())) = read { + let bytes_read = inner_buf.filled().len(); + self_mut.read_bytes_left -= bytes_read; + buf.advance(bytes_read); // Important: Advance the buffer + Poll::Ready(Ok(())) + } else { + read } - Ok(bytes_read) } } -impl<'a, T: Read + Write> Write for RWStreamWithLimit<'a, T> { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.read_bytes_left = 0; - self.stream.write(buf) +impl<'a, T: AsyncRead + AsyncWrite + Unpin> AsyncWrite for RWStreamWithLimit<'a, T> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let self_mut = self.get_mut(); + self_mut.read_bytes_left = 0; + + let stream = &mut self_mut.stream; + Pin::new(stream).poll_write(cx, buf) } - fn flush(&mut self) -> std::io::Result<()> { - self.stream.flush() + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let stream = &mut self.get_mut().stream; + Pin::new(stream).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let stream = &mut self.get_mut().stream; + Pin::new(stream).poll_shutdown(cx) } }