diff --git a/Cargo.toml b/Cargo.toml index a0339e6..4a614ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -purple_cello_mc_protocol = { git = "https://github.com/PurpleCelloServer/purple_cello_mc_protocol.git", rev = "b1a8cfdc91657566d68fb6f7dbc71bdee390cc9e" } +purple_cello_mc_protocol = { git = "https://github.com/PurpleCelloServer/purple_cello_mc_protocol.git", rev = "505adfb92c44db44a89effa4d38ebb863c2c60d0" } tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..98df22b --- /dev/null +++ b/src/client.rs @@ -0,0 +1,145 @@ +// Yeahbut May 2024 + +use std::error::Error; +use tokio::net::{TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}}; + +use purple_cello_mc_protocol::{ + mc_types::{self, Packet, ProtocolConnection}, + handshake, + login, +}; + +use crate::status_handle; +use crate::login_handle; +use crate::listener; + +pub async fn handle_client( + client_socket: TcpStream, + proxy_info: listener::ProxyInfo, +) { + println!("Accepting Connection"); + let backend_addr = proxy_info.formatted_backend_address(); + + let (mut client_reader, mut client_writer) = client_socket.into_split(); + let mut client_conn = ProtocolConnection::new( + &mut client_reader, + &mut client_writer, + ); + + let mut backend_socket: (OwnedReadHalf, OwnedWriteHalf); + let mut server_conn: Option> = + match TcpStream::connect(backend_addr).await { + Ok(backend_stream) => { + backend_socket = backend_stream.into_split(); + Some(ProtocolConnection::new( + &mut backend_socket.0, &mut backend_socket.1)) + }, + Err(_) => None, + }; + + let mut buffer: [u8; 1] = [0; 1]; + client_conn.stream_read.peek(&mut buffer) + .await.expect("Failed to peek at first byte from stream"); + let packet_id: u8 = buffer[0]; + + if packet_id == 0xFE { + status_handle::respond_legacy_status(&mut client_conn) + .await.expect("Error handling legacy status request"); + return; + } else { + let handshake_packet = + handshake::serverbound::Handshake::read(&mut client_conn) + .await.expect("Error reading handshake packet"); + println!("Next state: {}", handshake_packet.next_state); + if handshake_packet.next_state == 1 { + println!("Receiving Status Request"); + status_handle::respond_status( + proxy_info, + &mut client_conn, + &mut server_conn, + ).await.expect("Error handling status request"); + return; + } else if handshake_packet.next_state == 2 { + if handshake_packet.protocol_version == mc_types::VERSION_PROTOCOL { + match server_conn { + Some(mut server_conn) => { + if login_handle::respond_login( + &mut client_conn, + &mut server_conn, + ).await.expect( + "Error logging into proxy or server" + ) { + handle_play( + client_conn, + server_conn, + ).await; + } else { + println!("Player blocked from server"); + } + } + None => { + login::clientbound::Disconnect { + reason: "\"Server Error (Server may be starting)\"" + .to_string() + } + .write(&mut client_conn) + .await + .expect("Error sending disconnect on: \ +Failed to connect to the backend server"); + return; + } + }; + } + else + if handshake_packet.protocol_version < mc_types::VERSION_PROTOCOL { + println!("Client on outdated version"); + login::clientbound::Disconnect { + reason: format!( + "\"Client Error: Outdated Version (I'm on {})\"", + mc_types::VERSION_NAME, + ).to_string() + } + .write(&mut client_conn).await.expect( + "Error sending disconnect on: Client on wrong version"); + // if handshake_packet.protocol_version > mc_types::VERSION_PROTOCOL + } else { + println!("Client on future version"); + login::clientbound::Disconnect { + reason: format!( + "\"Client Error: Future Version (I'm on {})\"", + mc_types::VERSION_NAME, + ).to_string() + } + .write(&mut client_conn).await.expect( + "Error sending disconnect on: Client on wrong version"); + } + } else { + return; + } + } + + + println!("Connection Closed"); +} + +async fn handle_play( + mut client_conn: ProtocolConnection<'_>, + mut server_conn: ProtocolConnection<'_>, +) -> Result<(), Box> { + // Forward from client to backend + let to_backend = tokio::spawn(client_conn.forward_play(&mut server_conn)); + + // Forward from backend to client + // let to_client = tokio::spawn(async move { + // io::copy( + // &mut server_conn.stream_read, + // &mut client_conn.stream_write, + // ).await.expect( + // "Error copying from backend to client"); + // }); + let to_client = tokio::spawn(server_conn.forward_play(&mut client_conn)); + + tokio::try_join!(to_backend, to_client)?; + + Ok(()) +} diff --git a/src/listener.rs b/src/listener.rs new file mode 100644 index 0000000..19891ec --- /dev/null +++ b/src/listener.rs @@ -0,0 +1,44 @@ +// Yeahbut May 2024 + +use tokio::net::TcpListener; +use std::error::Error; + +#[derive(Copy, Clone)] +pub enum OnlineStatus { + Online, + Offline, +} + +#[derive(Clone)] +pub struct ProxyInfo { + pub proxy_addr: String, + pub proxy_port: u16, + pub online_status: OnlineStatus, + pub backend_addr: String, + pub backend_port: u16, +} + +impl ProxyInfo { + pub fn formatted_proxy_address(&self) -> String { + format!("{}:{}", self.proxy_addr, self.proxy_port) + } + + pub fn formatted_backend_address(&self) -> String { + format!("{}:{}", self.backend_addr, self.backend_port) + } +} + +pub struct TcpListenerWrapper { + pub listener: TcpListener, + pub info: ProxyInfo, +} + +impl TcpListenerWrapper { + pub async fn bind(info: ProxyInfo) -> Result> { + Ok(Self { + listener: TcpListener::bind( + info.formatted_proxy_address()).await?, + info: info, + }) + } +} diff --git a/src/login_handle.rs b/src/login_handle.rs index 20751bc..e09e390 100644 --- a/src/login_handle.rs +++ b/src/login_handle.rs @@ -4,12 +4,11 @@ use std::fs; use std::time::{Duration, Instant}; use std::sync::{Arc, Mutex}; -use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use serde_json::Value; use lazy_static::lazy_static; use purple_cello_mc_protocol::{ - mc_types::{self, Result, Packet}, + mc_types::{self, Result, Packet, ProtocolConnection}, handshake, login, }; @@ -164,20 +163,17 @@ fn check_player(player: Player) -> Result { } pub async fn respond_login( - client_reader: &mut OwnedReadHalf, - client_writer: &mut OwnedWriteHalf, - server_reader: &mut OwnedReadHalf, - server_writer: &mut OwnedWriteHalf, + client_conn: &mut ProtocolConnection<'_>, + server_conn: &mut ProtocolConnection<'_>, ) -> Result { - let proxy_login = login_to_proxy(client_reader).await?; + let proxy_login = login_to_proxy(client_conn).await?; match proxy_login { PlayerAllowed::True(player) => { println!("Player allowed"); login_to_backend( player, - client_writer, - server_reader, - server_writer, + client_conn, + server_conn, ).await?; return Ok(true) }, @@ -185,19 +181,19 @@ pub async fn respond_login( println!("Player blocked: {}", msg); login::clientbound::Disconnect { reason: format!("{{\"text\":\"{}\"}}", msg.to_string()) - }.write(client_writer).await?; + }.write(client_conn).await?; return Ok(false) } } } async fn login_to_proxy( - client_reader: &mut OwnedReadHalf, + client_conn: &mut ProtocolConnection<'_>, ) -> Result { println!("Logging into proxy"); let start_packet = - login::serverbound::LoginStart::read(client_reader).await?; + login::serverbound::LoginStart::read(client_conn).await?; let player: Player = Player { name: start_packet.name, @@ -209,9 +205,8 @@ async fn login_to_proxy( async fn login_to_backend( player: Player, - client_writer: &mut OwnedWriteHalf, - server_reader: &mut OwnedReadHalf, - server_writer: &mut OwnedWriteHalf, + client_conn: &mut ProtocolConnection<'_>, + server_conn: &mut ProtocolConnection<'_>, ) -> Result<()> { println!("Logging into backend"); handshake::serverbound::Handshake { @@ -219,23 +214,23 @@ async fn login_to_backend( server_address: "localhost".to_string(), server_port: 25565, next_state: 2, - }.write(server_writer).await?; + }.write(server_conn).await?; println!("Login start"); login::serverbound::LoginStart { name: player.name, player_uuid: player.player_uuid, - }.write(server_writer).await?; + }.write(server_conn).await?; println!("Finishing backend login"); - let packet = login::clientbound::LoginSuccess::read(server_reader).await?; + let packet = login::clientbound::LoginSuccess::read(server_conn).await?; println!("Finishing proxy login"); login::clientbound::LoginSuccess { uuid: packet.uuid.clone(), username: packet.username.clone(), properties: packet.properties.clone(), - }.write(client_writer).await?; + }.write(client_conn).await?; println!("Client logged in"); diff --git a/src/main.rs b/src/main.rs index dfe7daa..9bb05d2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,172 +1,53 @@ // Yeahbut December 2023 -use tokio::net::{TcpListener, TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}}; -use tokio::io; use std::error::Error; -use purple_cello_mc_protocol::{ - mc_types::{self, Packet}, - handshake, - login, -}; - mod status_handle; mod login_handle; +mod client; +mod listener; #[tokio::main] async fn main() -> Result<(), Box> { - let listener = TcpListener::bind("127.0.0.1:25565").await?; - println!("Proxy listening on port 25565..."); + let offline_info = listener::ProxyInfo{ + proxy_addr: "127.0.0.1".to_string(), + proxy_port: 25565, + online_status: listener::OnlineStatus::Offline, + backend_addr: "127.0.0.1".to_string(), + backend_port: 25564, + }; + let online_info = listener::ProxyInfo{ + proxy_addr: "127.0.0.1".to_string(), + proxy_port: 25566, + online_status: listener::OnlineStatus::Online, + backend_addr: "127.0.0.1".to_string(), + backend_port: 25564, + }; - while let Ok((client_socket, _)) = listener.accept().await { - tokio::spawn(handle_client(client_socket)); - } + let listener_offline: listener::TcpListenerWrapper = + listener::TcpListenerWrapper::bind(offline_info).await?; + let listener_online: listener::TcpListenerWrapper = + listener::TcpListenerWrapper::bind(online_info).await?; + + println!("Proxy listening on port 25565 and 25566..."); + + let handle_offline = tokio::spawn(async move{ + while let Ok((client_socket, _)) = listener_offline + .listener.accept().await { + tokio::spawn(client::handle_client( + client_socket, listener_offline.info.clone())); + } + }); + let handle_online = tokio::spawn(async move{ + while let Ok((client_socket, _)) = listener_online + .listener.accept().await { + tokio::spawn(client::handle_client( + client_socket, listener_online.info.clone())); + } + }); + + tokio::try_join!(handle_offline, handle_online)?; Ok(()) } - -async fn handle_client(client_socket: TcpStream) { - println!("Accepting Connection"); - let backend_addr = "127.0.0.1:25566"; - - let (mut client_reader, mut client_writer) = client_socket.into_split(); - - // "Failed to connect to the backend server" - - let backend_socket = match TcpStream::connect(backend_addr).await { - Ok(backend_socket) => Some(backend_socket.into_split()), - Err(_) => None, - }; - - let (mut server_reader, mut server_writer): - (Option, Option) = - match backend_socket { - Some(backend_socket) => - (Some(backend_socket.0), Some(backend_socket.1)), - None => (None, None), - }; - - let mut buffer: [u8; 1] = [0; 1]; - client_reader.peek(&mut buffer) - .await.expect("Failed to peek at first byte from stream"); - let packet_id: u8 = buffer[0]; - - if packet_id == 0xFE { - status_handle::respond_legacy_status(&mut client_writer) - .await.expect("Error handling legacy status request"); - return; - } else { - let handshake_packet = - handshake::serverbound::Handshake::read(&mut client_reader) - .await.expect("Error reading handshake packet"); - println!("Next state: {}", handshake_packet.next_state); - if handshake_packet.next_state == 1 { - println!("Receiving Status Request"); - status_handle::respond_status( - &mut client_reader, - &mut client_writer, - &mut server_reader, - &mut server_writer, - ).await.expect("Error handling status request"); - return; - } else if handshake_packet.next_state == 2 { - if handshake_packet.protocol_version == mc_types::VERSION_PROTOCOL { - match server_writer { - Some(mut server_writer) => { - match server_reader { - Some(mut server_reader) => { - - if login_handle::respond_login( - &mut client_reader, - &mut client_writer, - &mut server_reader, - &mut server_writer, - ).await.expect( - "Error logging into proxy or server" - ) { - handle_play( - client_reader, - client_writer, - server_reader, - server_writer, - ).await; - } else { - println!("Player blocked from server"); - } - }, - None => { - eprintln!( - "Failed to connect to the backend server"); - return; - } - }; - }, - None => { - login::clientbound::Disconnect { - reason: "\"Server Error (Server may be starting)\"" - .to_string() - } - .write(&mut client_writer) - .await - .expect("Error sending disconnect on: \ -Failed to connect to the backend server"); - } - }; - } - else - if handshake_packet.protocol_version < mc_types::VERSION_PROTOCOL { - println!("Client on outdated version"); - login::clientbound::Disconnect { - reason: format!( - "\"Client Error: Outdated Version (I'm on {})\"", - mc_types::VERSION_NAME, - ).to_string() - } - .write(&mut client_writer).await.expect( - "Error sending disconnect on: Client on wrong version"); - // if handshake_packet.protocol_version > mc_types::VERSION_PROTOCOL - } else { - println!("Client on future version"); - login::clientbound::Disconnect { - reason: format!( - "\"Client Error: Future Version (I'm on {})\"", - mc_types::VERSION_NAME, - ).to_string() - } - .write(&mut client_writer).await.expect( - "Error sending disconnect on: Client on wrong version"); - } - } else { - return; - } - } - - - println!("Connection Closed"); -} - -async fn handle_play( - mut client_reader: OwnedReadHalf, - mut client_writer: OwnedWriteHalf, - mut server_reader: OwnedReadHalf, - mut server_writer: OwnedWriteHalf, -) { - // Forward from client to backend - tokio::spawn(async move { - io::copy( - &mut client_reader, - &mut server_writer, - ).await.expect( - "Error copying from client to backend"); - }); - - // Forward from backend to client - tokio::spawn(async move { - io::copy( - &mut server_reader, - &mut client_writer, - ).await.expect( - "Error copying from backend to client"); - }); -} diff --git a/src/status_handle.rs b/src/status_handle.rs index 7d288cc..62438ac 100644 --- a/src/status_handle.rs +++ b/src/status_handle.rs @@ -5,7 +5,6 @@ use std::io::Read; use std::time::{Duration, Instant}; use std::sync::{Arc, Mutex}; -use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::io::AsyncWriteExt; use serde_json::Value; use base64::{Engine as _, engine::general_purpose}; @@ -13,11 +12,13 @@ use rand::Rng; use lazy_static::lazy_static; use purple_cello_mc_protocol::{ - mc_types::{self, Result, Packet}, + mc_types::{self, Result, Packet, ProtocolConnection}, handshake, status, }; +use crate::listener; + const EXPIRATION_DURATION: Duration = Duration::from_secs(3600); struct CachedMotds { @@ -26,10 +27,10 @@ struct CachedMotds { } async fn online_players( - server_reader: &mut OwnedReadHalf, - server_writer: &mut OwnedWriteHalf, + proxy_info: listener::ProxyInfo, + server_conn: &mut ProtocolConnection<'_>, ) -> Result { - Ok(get_upstream_status(server_reader, server_writer).await?.players) + Ok(get_upstream_status(proxy_info, server_conn).await?.players) } fn load_motds() -> Value { @@ -135,31 +136,28 @@ fn favicon() -> Option { } pub async fn respond_status( - client_reader: &mut OwnedReadHalf, - client_writer: &mut OwnedWriteHalf, - server_reader: &mut Option, - server_writer: &mut Option, + proxy_info: listener::ProxyInfo, + client_conn: &mut ProtocolConnection<'_>, + server_conn: &mut Option>, )-> Result<()> { loop { println!("Status Handling"); let packet = - status::serverbound::StatusPackets::read(client_reader).await?; + status::serverbound::StatusPackets::read(client_conn).await?; match packet { status::serverbound::StatusPackets::Status(_) => { println!("Handling Status"); let favicon = favicon(); - let online_players = match server_reader { - Some(server_reader) => match server_writer { - Some(server_writer) => match online_players( - server_reader, - server_writer, + let online_players = match server_conn { + Some(server_conn) => + match online_players( + proxy_info.clone(), + server_conn, ).await { Ok(value) => Some(value), Err(_) => None, }, - None => None, - }, None => None, }; @@ -205,14 +203,14 @@ pub async fn respond_status( let packet = status::clientbound::Status::from_json(status_response)?; - packet.write(client_writer).await?; + packet.write(client_conn).await?; }, status::serverbound::StatusPackets::Ping(packet) => { println!("Handling Ping"); let new_packet = status::clientbound::Ping{ payload: packet.payload, }; - new_packet.write(client_writer).await?; + new_packet.write(client_conn).await?; break; } } @@ -221,39 +219,27 @@ pub async fn respond_status( } pub async fn get_upstream_status( - server_reader: &mut OwnedReadHalf, - server_writer: &mut OwnedWriteHalf, + proxy_info: listener::ProxyInfo, + server_conn: &mut ProtocolConnection<'_>, ) -> Result { handshake::serverbound::Handshake{ protocol_version: mc_types::VERSION_PROTOCOL, - server_address: "localhost".to_string(), - server_port: 25565, + server_address: proxy_info.backend_addr, + server_port: proxy_info.backend_port, next_state: 1, - }.write(server_writer).await?; - status::serverbound::Status{}.write(server_writer).await?; - let packet = status::clientbound::Status::read(server_reader).await?; + }.write(server_conn).await?; + status::serverbound::Status{}.write(server_conn).await?; + let packet = status::clientbound::Status::read(server_conn).await?; let status_response = packet.get_json()?; - // mc_types::write_data(server_writer, &mut vec![0]).await?; - // let mut data = mc_types::read_data(server_reader).await?; - - // mc_types::get_u8(&mut data); - // let json = mc_types::get_string(&mut data)?; - // let status_response: status::clientbound::StatusResponseData = - // serde_json::from_str(&json)?; - - // let mut out_data: Vec = vec![1]; - // out_data.append(&mut mc_types::convert_i64(0)); - // mc_types::write_packet(server_writer, &mut out_data).await?; - Ok(status_response) } pub async fn respond_legacy_status( - client_writer: &mut OwnedWriteHalf, + client_conn: &mut ProtocolConnection<'_>, ) -> Result<()> { println!("Old Style Status"); - client_writer.write_u8(0xFF).await?; + client_conn.stream_write.write_u8(0xFF).await?; let s = "ยง1\0127\0".to_string() + mc_types::VERSION_NAME + @@ -263,9 +249,9 @@ pub async fn respond_legacy_status( .flat_map(|c| std::iter::once(c).chain(std::iter::once(0))) .collect(); - client_writer.write_u16((utf16_vec.len() / 2) as u16).await?; + client_conn.stream_write.write_u16((utf16_vec.len() / 2) as u16).await?; for utf16_char in utf16_vec { - client_writer.write_u16(utf16_char).await?; + client_conn.stream_write.write_u16(utf16_char).await?; } Ok(())