# Yeahbut Apr 2024 import certifi, os os.environ["SSL_CERT_FILE"] = certifi.where() # Modified from https://raw.githubusercontent.com/barneygale/quarry/master/examples/server_chat_room_advanced.py from typing import List import subprocess from twisted.internet import reactor from quarry.net.server import ServerFactory, ServerProtocol from quarry.types.uuid import UUID from quarry.types.chunk import BlockArray, PackedArray from quarry.types.registry import LookupRegistry from quarry.types.buffer.v1_14 import Buffer1_14 import quarry.types.nbt as NBT from quarry.data.data_packs import data_packs, dimension_types def is_java_17_installed(): try: result = subprocess.run(['java', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return result.returncode == 0# and "java version \"17" in result.stdout except FileNotFoundError: return False def bootstrap_server_data(): import requests if not is_java_17_installed(): # winget install Microsoft.OpenJDK.17 print("Java not found, please install Java 17 (or newer).") return url = "https://piston-data.mojang.com/v1/objects/c8f83c5655308435b3dcf03c06d9fe8740a77469/server.jar" command = ["java", "-DbundlerMainClass=net.minecraft.data.Main", "-jar", "minecraft_server.jar", "--reports"] r = requests.get(url) os.makedirs("generate_data") with open(r'generate_data\minecraft_server.jar', 'wb') as file: file.write(r.content) process = subprocess.Popen(command, shell=True, cwd="generate_data") process.wait() BIOME_ID = 7 # Forest Data ID # java -DbundlerMainClass=net.minecraft.data.Main -jar minecraft_server.jar --reports if not os.path.exists("generate_data"): bootstrap_server_data() REGISTRY = LookupRegistry.from_json(r'generate_data\generated\reports') CHUNKS_TALL = 28 BLOCK_EMPTY = {'name': 'minecraft:air'} BLOCK_SURFACE = {'name': 'minecraft:grass_block', 'snowy': 'false'} BLOCK_SUBSURFACE = {'name': 'minecraft:dirt'} BLOCK_UNDERGROUND = {'name': 'minecraft:stone'} class Buffer_1_18_2(Buffer1_14): @classmethod def pack_chunk(cls, sections): data = b"" for section in sections: if section and not section.is_empty(): data += cls.pack_chunk_section(section) return data @classmethod def pack_chunk_section(cls, blocks, block_lights=None, sky_lights=None): """ Packs a chunk section. The supplied argument should be an instance of ``quarry.types.chunk.BlockArray``. """ out = cls.pack('HB', blocks.non_air, blocks.storage.value_width) out += cls.pack_chunk_section_palette(blocks.palette) out += cls.pack_chunk_section_array(blocks.to_bytes()) out += cls.pack('B', 0) # Biome Bits Per Entry out += cls.pack_varint(BIOME_ID) # Biome Palette out += cls.pack_varint(0) # Biome Data Array Length return out def unpack_chunk(self, bitmask, overworld=True): raise NotImplementedError def unpack_chunk_section(self, overworld=True): raise NotImplementedError class YTDServerProtocol(ServerProtocol): def player_joined(self): # Call super. This switches us to "play" mode, marks the player as # in-game, and does some logging. ServerProtocol.player_joined(self) self.buff_type = Buffer_1_18_2 # Send join game packet self.send_join_game() # Send "Player Position and Look" packet self.send_packet( "player_position_and_look", self.buff_type.pack("dddff?", 0, # x 325, # y Must be >= build height to pass the "Loading Terrain" screen on 1.18.2 0, # z 0, # yaw 0, # pitch 0b00000), # flags self.buff_type.pack_varint(0), # teleport id self.buff_type.pack("?", True)) # Leave vehicle # Start sending "Keep Alive" packets self.ticker.add_loop(20, self.update_keep_alive) # Announce player join to other players self.factory.broadcast_player_join(self) # Send full player list self.factory.send_player_list_add(self, self.factory.players) self.send_update_view_position(0,0) self.send_chunk(0,0) self.send_chunk(-1,0) self.send_chunk(0,-1) self.send_chunk(-1,-1) # self.send_chunk(1,0) # self.send_chunk(0,1) # self.send_chunk(1,1) self.send_update_view_position(0,0) def send_join_game(self): # Build up fields for "Join Game" packet entity_id = 0 max_players = 0 hashed_seed = 42 view_distance = 2 simulation_distance = 2 game_mode = 3 prev_game_mode = 3 is_hardcore = False is_reduced_debug = False is_respawn_screen = False is_debug = False is_flat = True dimension_codec = data_packs[self.protocol_version] dimension_name = "minecraft:overworld" dimension_tag = dimension_types[self.protocol_version, dimension_name] world_count = 1 world_name = "favorite_place" join_game = [ self.buff_type.pack("i?Bb", entity_id, is_hardcore, game_mode, prev_game_mode), self.buff_type.pack_varint(world_count), self.buff_type.pack_string(world_name), self.buff_type.pack_nbt(dimension_codec), ] join_game.append(self.buff_type.pack_nbt(dimension_tag)) join_game.append(self.buff_type.pack_string(world_name)) join_game.append(self.buff_type.pack("q", hashed_seed)) join_game.append(self.buff_type.pack_varint(max_players)) join_game.append(self.buff_type.pack_varint(view_distance)), join_game.append(self.buff_type.pack_varint(simulation_distance)) join_game.append(self.buff_type.pack("????", is_reduced_debug, is_respawn_screen, is_debug, is_flat)) # Send "Join Game" packet self.send_packet("join_game", *join_game) def player_left(self): ServerProtocol.player_left(self) # Announce player leave to other players self.factory.broadcast_player_leave(self) def update_keep_alive(self): # Send a "Keep Alive" packet self.send_packet("keep_alive", self.buff_type.pack('Q', 0)) def packet_chat_message(self, buff): if self.protocol_mode != 'play': return message = buff.unpack_string() self.factory.broadcast_chat(message, self.uuid, self.display_name) buff.discard() def send_update_view_position(self, x, z): self.send_packet( 'update_view_position', self.buff_type.pack_varint(x), self.buff_type.pack_varint(z), ) # def send_chunk(self, x, z, full, heightmap, sections, biomes): def send_chunk(self, x, z): sections, heightmap = self.factory.generate(x,z) sections_data = self.buff_type.pack_chunk(sections) self.send_packet( 'unload_chunk', self.buff_type.pack('ii', x, z), ) self.send_packet( 'chunk_data', self.buff_type.pack('ii', x, z), self.buff_type.pack_nbt(heightmap), self.buff_type.pack_varint(len(sections_data)), sections_data, self.buff_type.pack_varint(0), # Always zero block entities self.buff_type.pack('?', True), self.buff_type.pack_varint(0), # Sky Light Mask self.buff_type.pack_varint(0), # Block Light Mask self.buff_type.pack_varint(0), # Empty Sky Light Mask self.buff_type.pack_varint(0), # Empty Block Light Mask self.buff_type.pack_varint(0), # Sky Light Array Length self.buff_type.pack_varint(0), # Block Light Array Length ) class YTDServerFactory(ServerFactory): protocol = YTDServerProtocol motd = "YTD Custom Server (WIP)" force_protocol_version = 758 # 1.18.2 # Sends an unsigned chat message, using system messages on supporting clients def broadcast_chat(self, message: str, sender: UUID, sender_name: str): for player in self.players: if player.protocol_mode != 'play': continue self.send_chat(player, message, sender, sender_name) def send_chat(self, player: YTDServerProtocol, message: str, sender: UUID, sender_name: str): player.send_packet("chat_message", player.buff_type.pack_chat("<%s> %s" % (sender_name, message)), player.buff_type.pack('B', 0), player.buff_type.pack_uuid(sender)) # Sends a system message, falling back to chat messages on older clients def broadcast_system(self, message: str): for player in self.players: if player.protocol_mode != 'play': continue self.send_system(player, message) @staticmethod def send_system(player: YTDServerProtocol, message: str): player.send_packet("chat_message", player.buff_type.pack_chat(message), player.buff_type.pack('B', 0), player.buff_type.pack_uuid(UUID(int=0))) # Announces player join def broadcast_player_join(self, joined: YTDServerProtocol): self.broadcast_system("\u00a7e%s has joined." % joined.display_name) self.broadcast_player_list_add(joined) # Announces player leave def broadcast_player_leave(self, left: YTDServerProtocol): self.broadcast_system("\u00a7e%s has left." % left.display_name) self.broadcast_player_list_remove(left) # Sends player list entry for new player to other players def broadcast_player_list_add(self, added: YTDServerProtocol): for player in self.players: # Exclude the added player, they will be sent the full player list separately if player.protocol_mode == 'play' and player != added: self.send_player_list_add(player, [added]) @staticmethod def send_player_list_add(player: YTDServerProtocol, added: List[YTDServerProtocol]): data = [ player.buff_type.pack_varint(0), # Action - 0 = Player add player.buff_type.pack_varint(len(added)), # Player entry count ] for entry in added: if entry.protocol_mode != 'play': continue data.append(player.buff_type.pack_uuid(entry.uuid)) # Player UUID data.append(player.buff_type.pack_string(entry.display_name)) # Player name data.append(player.buff_type.pack_varint(0)) # Empty properties list data.append(player.buff_type.pack_varint(3)) # Gamemode data.append(player.buff_type.pack_varint(0)) # Latency data.append(player.buff_type.pack('?', False)) # No display name player.send_packet('player_list_item', *data) # Sends player list update for leaving player to other players def broadcast_player_list_remove(self, removed: YTDServerProtocol): for player in self.players: if player.protocol_mode == 'play' and player != removed: player.send_packet('player_list_item', player.buff_type.pack_varint(4), # Action - 4 = Player remove player.buff_type.pack_varint(1), # Player entry count player.buff_type.pack_uuid(removed.uuid)) # Player UUID def generate(self, x, z): # array[y][z][x] array = lambda x, y, z: (y * 16 * 16) + (z * 16) + x chunk_range = lambda: [(array(x,y,z), x,y,z) for x in range(16) for y in range(16) for z in range(16)] sections = [] for _ in range(CHUNKS_TALL): sections.append(BlockArray.empty(REGISTRY)) for sec in sections: for i, xi, yi, zi in chunk_range(): if xi == yi and yi == zi: sec[i] = BLOCK_SURFACE elif xi == (yi+1)%16 and (yi+1)%16 == zi: sec[i] = BLOCK_SUBSURFACE elif xi == (yi+2)%16 and (yi+2)%16 == zi: sec[i] = BLOCK_UNDERGROUND else: sec[i] = BLOCK_EMPTY height_map = NBT.TagRoot({'':NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})}) return sections, height_map def main(argv): # Parse options import argparse parser = argparse.ArgumentParser() parser.add_argument("-a", "--host", default="", help="address to listen on") parser.add_argument("-p", "--port", default=25565, type=int, help="port to listen on") parser.add_argument("--offline", action="store_true", help="offline server") args = parser.parse_args(argv) # Create factory factory = YTDServerFactory() factory.online_mode = False#not args.offline # Listen factory.listen(args.host, args.port) reactor.run() if __name__ == "__main__": import sys main(sys.argv[1:])