# Yeahbut Apr 2024 import certifi, os os.environ["SSL_CERT_FILE"] = certifi.where() # Modified from https://raw.githubusercontent.com/barneygale/quarry/master/examples/server_chat_room_advanced.py from typing import List import subprocess from twisted.internet import reactor from quarry.net.server import ServerFactory, ServerProtocol from quarry.types.uuid import UUID from quarry.types.chunk import BlockArray, PackedArray from quarry.types.registry import LookupRegistry import quarry.types.nbt as NBT from quarry.data.data_packs import data_packs, dimension_types def is_java_17_installed(): try: result = subprocess.run(['java', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return result.returncode == 0# and "java version \"17" in result.stdout except FileNotFoundError: return False def bootstrap_server_data(): import requests if not is_java_17_installed(): # winget install Microsoft.OpenJDK.17 print("Java not found, please install Java 17 (or newer).") return url = "https://piston-data.mojang.com/v1/objects/c8f83c5655308435b3dcf03c06d9fe8740a77469/server.jar" command = ["java", "-DbundlerMainClass=net.minecraft.data.Main", "-jar", "minecraft_server.jar", "--reports"] r = requests.get(url) os.makedirs("generate_data") with open(r'generate_data\minecraft_server.jar', 'wb') as file: file.write(r.content) process = subprocess.Popen(command, shell=True, cwd="generate_data") process.wait() FOREST_BIOME_DATA_PACK_ID = 7 # java -DbundlerMainClass=net.minecraft.data.Main -jar minecraft_server.jar --reports if not os.path.exists("generate_data"): bootstrap_server_data() REGISTRY = LookupRegistry.from_json(r'generate_data\generated\reports') CHUNKS_TALL = 28 class YTDServerProtocol(ServerProtocol): def player_joined(self): # Call super. This switches us to "play" mode, marks the player as # in-game, and does some logging. ServerProtocol.player_joined(self) # Send join game packet self.send_join_game() # Send "Player Position and Look" packet self.send_packet( "player_position_and_look", self.buff_type.pack("dddff?", 0, # x 385, # y Must be >= build height to pass the "Loading Terrain" screen on 1.18.2 0, # z 0, # yaw 0, # pitch 0b00000), # flags self.buff_type.pack_varint(0), # teleport id self.buff_type.pack("?", True)) # Leave vehicle # Start sending "Keep Alive" packets self.ticker.add_loop(20, self.update_keep_alive) # Announce player join to other players self.factory.broadcast_player_join(self) # Send full player list self.factory.send_player_list_add(self, self.factory.players) self.send_update_view_position(0,0) self.send_chunk(0,0) self.send_chunk(-1,0) self.send_chunk(0,-1) self.send_chunk(-1,-1) def send_join_game(self): # Build up fields for "Join Game" packet entity_id = 0 max_players = 0 hashed_seed = 42 view_distance = 2 simulation_distance = 2 game_mode = 3 prev_game_mode = 3 is_hardcore = False is_reduced_debug = False is_respawn_screen = False is_debug = False is_flat = True dimension_codec = data_packs[self.protocol_version] dimension_name = "minecraft:overworld" dimension_tag = dimension_types[self.protocol_version, dimension_name] world_count = 1 world_name = "favorite_place" join_game = [ self.buff_type.pack("i?Bb", entity_id, is_hardcore, game_mode, prev_game_mode), self.buff_type.pack_varint(world_count), self.buff_type.pack_string(world_name), self.buff_type.pack_nbt(dimension_codec), ] join_game.append(self.buff_type.pack_nbt(dimension_tag)) join_game.append(self.buff_type.pack_string(world_name)) join_game.append(self.buff_type.pack("q", hashed_seed)) join_game.append(self.buff_type.pack_varint(max_players)) join_game.append(self.buff_type.pack_varint(view_distance)), join_game.append(self.buff_type.pack_varint(simulation_distance)) join_game.append(self.buff_type.pack("????", is_reduced_debug, is_respawn_screen, is_debug, is_flat)) # Send "Join Game" packet self.send_packet("join_game", *join_game) def player_left(self): ServerProtocol.player_left(self) # Announce player leave to other players self.factory.broadcast_player_leave(self) def update_keep_alive(self): # Send a "Keep Alive" packet self.send_packet("keep_alive", self.buff_type.pack('Q', 0)) def packet_chat_message(self, buff): if self.protocol_mode != 'play': return message = buff.unpack_string() self.factory.broadcast_chat(message, self.uuid, self.display_name) buff.discard() def send_update_view_position(self, x, z): self.send_packet( 'update_view_position', self.buff_type.pack_varint(x), self.buff_type.pack_varint(z), ) # def send_chunk(self, x, z, full, heightmap, sections, biomes): def send_chunk(self, x, z): sections, heightmap = self.factory.generate(x,z) sections_data = self.buff_type.pack_chunk(sections) self.send_packet( 'unload_chunk', self.buff_type.pack('ii', x, z), ) self.send_packet( 'chunk_data', self.buff_type.pack('ii', x, z), # # self.buff_type.pack('ii?', x, z, full), # # self.buff_type.pack_chunk_bitmask(sections), self.buff_type.pack_nbt(heightmap), # # self.buff_type.pack_array('I', biomes) if full else b'', self.buff_type.pack_varint(len(sections_data)), sections_data, self.buff_type.pack_varint(0), # Always zero block entities self.buff_type.pack('?', False), self.buff_type.pack_varint(0), # Sky Light Mask self.buff_type.pack_varint(0), # Block Light Mask self.buff_type.pack_varint(0), # Empty Sky Light Mask self.buff_type.pack_varint(0), # Empty Block Light Mask self.buff_type.pack_varint(0), # Sky Light Array Length self.buff_type.pack_varint(0), # Block Light Array Length ) class YTDServerFactory(ServerFactory): protocol = YTDServerProtocol motd = "YTD Custom Server (WIP)" force_protocol_version = 758 # 1.18.2 # Sends an unsigned chat message, using system messages on supporting clients def broadcast_chat(self, message: str, sender: UUID, sender_name: str): for player in self.players: if player.protocol_mode != 'play': continue self.send_chat(player, message, sender, sender_name) def send_chat(self, player: YTDServerProtocol, message: str, sender: UUID, sender_name: str): player.send_packet("chat_message", player.buff_type.pack_chat("<%s> %s" % (sender_name, message)), player.buff_type.pack('B', 0), player.buff_type.pack_uuid(sender)) # Sends a system message, falling back to chat messages on older clients def broadcast_system(self, message: str): for player in self.players: if player.protocol_mode != 'play': continue self.send_system(player, message) @staticmethod def send_system(player: YTDServerProtocol, message: str): player.send_packet("chat_message", player.buff_type.pack_chat(message), player.buff_type.pack('B', 0), player.buff_type.pack_uuid(UUID(int=0))) # Announces player join def broadcast_player_join(self, joined: YTDServerProtocol): self.broadcast_system("\u00a7e%s has joined." % joined.display_name) self.broadcast_player_list_add(joined) # Announces player leave def broadcast_player_leave(self, left: YTDServerProtocol): self.broadcast_system("\u00a7e%s has left." % left.display_name) self.broadcast_player_list_remove(left) # Sends player list entry for new player to other players def broadcast_player_list_add(self, added: YTDServerProtocol): for player in self.players: # Exclude the added player, they will be sent the full player list separately if player.protocol_mode == 'play' and player != added: self.send_player_list_add(player, [added]) @staticmethod def send_player_list_add(player: YTDServerProtocol, added: List[YTDServerProtocol]): data = [ player.buff_type.pack_varint(0), # Action - 0 = Player add player.buff_type.pack_varint(len(added)), # Player entry count ] for entry in added: if entry.protocol_mode != 'play': continue data.append(player.buff_type.pack_uuid(entry.uuid)) # Player UUID data.append(player.buff_type.pack_string(entry.display_name)) # Player name data.append(player.buff_type.pack_varint(0)) # Empty properties list data.append(player.buff_type.pack_varint(3)) # Gamemode data.append(player.buff_type.pack_varint(0)) # Latency data.append(player.buff_type.pack('?', False)) # No display name player.send_packet('player_list_item', *data) # Sends player list update for leaving player to other players def broadcast_player_list_remove(self, removed: YTDServerProtocol): for player in self.players: if player.protocol_mode == 'play' and player != removed: player.send_packet('player_list_item', player.buff_type.pack_varint(4), # Action - 4 = Player remove player.buff_type.pack_varint(1), # Player entry count player.buff_type.pack_uuid(removed.uuid)) # Player UUID def generate(self, x, z): sections = [] for _ in range(CHUNKS_TALL): sections.append((BlockArray.empty(REGISTRY),None,None)) for sec in sections: sec[0][0] = {'name': 'minecraft:grass_block', 'snowy': 'false'} for i in range(1,16*16*16): sec[0][i] = {'name': 'minecraft:air'} # height_map = NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())}) # height_map = NBT.TagRoot(NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})) # height_map = NBT.TagRoot({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())}) height_map = NBT.TagRoot({'':NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})}) return sections, height_map def main(argv): # Parse options import argparse parser = argparse.ArgumentParser() parser.add_argument("-a", "--host", default="", help="address to listen on") parser.add_argument("-p", "--port", default=25565, type=int, help="port to listen on") parser.add_argument("--offline", action="store_true", help="offline server") args = parser.parse_args(argv) # Create factory factory = YTDServerFactory() factory.online_mode = False#not args.offline # Listen factory.listen(args.host, args.port) reactor.run() if __name__ == "__main__": import sys main(sys.argv[1:])