# Yeahbut Apr 2024 import math import certifi, os os.environ["SSL_CERT_FILE"] = certifi.where() # Modified from https://raw.githubusercontent.com/barneygale/quarry/master/examples/server_chat_room_advanced.py from typing import List import subprocess import time from twisted.internet import reactor from quarry.net.server import ServerFactory, ServerProtocol from quarry.types.uuid import UUID from quarry.types.chunk import BlockArray, PackedArray from quarry.types.registry import LookupRegistry from quarry.types.buffer.v1_14 import Buffer1_14 import quarry.types.nbt as NBT from quarry.data.data_packs import data_packs, dimension_types def is_java_17_installed(): try: result = subprocess.run(['java', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) return result.returncode == 0# and "java version \"17" in result.stdout except FileNotFoundError: return False def bootstrap_server_data(): import requests if not is_java_17_installed(): # winget install Microsoft.OpenJDK.17 print("Java not found, please install Java 17 (or newer).") return url = "https://piston-data.mojang.com/v1/objects/c8f83c5655308435b3dcf03c06d9fe8740a77469/server.jar" command = ["java", "-DbundlerMainClass=net.minecraft.data.Main", "-jar", "minecraft_server.jar", "--reports"] r = requests.get(url) os.makedirs("generate_data") with open(r'generate_data\minecraft_server.jar', 'wb') as file: file.write(r.content) process = subprocess.Popen(command, shell=True, cwd="generate_data") process.wait() BIOME_ID = 7 # Forest Data ID # java -DbundlerMainClass=net.minecraft.data.Main -jar minecraft_server.jar --reports if not os.path.exists("generate_data"): bootstrap_server_data() REGISTRY = LookupRegistry.from_json(r'generate_data\generated\reports') CHUNKS_TALL = 28 BLOCK_EMPTY = {'name': 'minecraft:air'} BLOCK_SURFACE = {'name': 'minecraft:grass_block', 'snowy': 'false'} BLOCK_SUBSURFACE = {'name': 'minecraft:dirt'} BLOCK_UNDERGROUND = {'name': 'minecraft:stone'} BLOCK_DEFAULT = {'name': 'minecraft:cobblestone'} class Buffer_1_18_2(Buffer1_14): @classmethod def pack_chunk(cls, sections): data = b"" for section in sections: if section and not section.is_empty(): data += cls.pack_chunk_section(section) return data @classmethod def pack_chunk_section(cls, blocks, block_lights=None, sky_lights=None): """ Packs a chunk section. The supplied argument should be an instance of ``quarry.types.chunk.BlockArray``. """ out = cls.pack('HB', blocks.non_air, blocks.storage.value_width) out += cls.pack_chunk_section_palette(blocks.palette) out += cls.pack_chunk_section_array(blocks.to_bytes()) out += cls.pack('B', 0) # Biome Bits Per Entry out += cls.pack_varint(BIOME_ID) # Biome Palette out += cls.pack_varint(0) # Biome Data Array Length return out def unpack_chunk(self, bitmask, overworld=True): raise NotImplementedError def unpack_chunk_section(self, overworld=True): raise NotImplementedError class Player: _conn: "YTDServerProtocol" _name: str _x: float _y: float _z: float _yaw: float _pitch: float _chunks: set[tuple[int,int]] _load_queue: list[tuple[int,int]] _unload_queue: list[tuple[int,int]] def __init__( self, connection: "YTDServerProtocol", name: str, x: float, y: float, z: float, yaw: float, pitch: float, ): self._conn = connection self._name = name self._x = x self._y = y self._z = z self._yaw = yaw self._pitch = pitch self._chunks = set() self._load_queue = list() self._unload_queue = list() @property def name(self) -> str: return self._name @property def x(self) -> float: return self._x @property def y(self) -> float: return self._y @property def z(self) -> float: return self._z @property def yaw(self) -> float: return self._yaw @yaw.setter def yaw(self, value: float): self._yaw = value @property def pitch(self) -> float: return self._pitch @pitch.setter def pitch(self, value: float): self._pitch = value @property def cx(self) -> int: return int(self.x // 16) @property def cy(self) -> int: return int(self.y // 16) @property def cz(self) -> int: return int(self.z // 16) def update_pos(self, x: float, y: float, z: float): chunk_crossed = int(x // 16) != self.cx or int(z // 16) != self.cz self._x = x self._y = y self._z = z if chunk_crossed: self.update_chunks() def update_chunks(self): print("Chunk Crossed") vd = self._conn.view_distance chunks: set[tuple[int,int]] = set() for x in range(self.cx-vd, self.cx+vd+1): for z in range(self.cz-vd, self.cz+vd+1): chunks.add((x,z)) new_chunks = chunks.difference(self._chunks) old_chunks = self._chunks.difference(chunks) for i in new_chunks: self.load_chunk(*i) for i in old_chunks: self.unload_chunk(*i) self._load_queue.sort(key=lambda o: math.sqrt(pow(self.cx - o[0],2) + pow(self.cz - o[1],2))) self._conn.send_update_view_position(self.cx,self.cz) def load_chunk(self, x: int, z: int): print(f"enqueuing loading chunk {x}, {z}") self._chunks.add((x,z)) if (x,z) not in self._load_queue: self._load_queue.append((x,z)) if (x,z) in self._unload_queue: self._unload_queue.remove((x,z)) # self._conn.send_chunk(x,z) def unload_chunk(self, x: int, z: int): print(f"enqueuing unloading chunk {x}, {z}") try: self._chunks.remove((x,z)) except KeyError: pass if (x,z) not in self._unload_queue: self._unload_queue.append((x,z)) if (x,z) in self._load_queue: self._load_queue.remove((x,z)) # self._conn.send_unload_chunk(x,z) class YTDServerProtocol(ServerProtocol): def player_joined(self): # Call super. This switches us to "play" mode, marks the player as # in-game, and does some logging. ServerProtocol.player_joined(self) self.buff_type = Buffer_1_18_2 self.view_distance = 8 self.player = Player( self, self.display_name, # type: ignore 0, 128, #325, 0, 0, 0, ) # Send join game packet self.send_join_game() # Send "Player Position and Look" packet self.send_packet( "player_position_and_look", self.buff_type.pack("dddff?", self.player.x, self.player.y, self.player.z, self.player.yaw, self.player.pitch, 0b00000), self.buff_type.pack_varint(0), self.buff_type.pack("?", True)) # Start sending "Keep Alive" packets self.ticker.add_loop(20, self.update_keep_alive) # self.ticker.add_loop(.25/self.ticker.interval, self.manage_queue) self.ticker.add_loop(1, self.manage_queue) # Announce player join to other players self.factory.broadcast_player_join(self) # Send full player list self.factory.send_player_list_add(self, self.factory.players) self.send_update_view_position(0,0) self.player.load_chunk(0,0) self.player.load_chunk(-1,0) self.player.load_chunk(0,-1) self.player.load_chunk(-1,-1) # self.send_chunk(0,0) # self.send_chunk(-1,0) # self.send_chunk(0,-1) # self.send_chunk(-1,-1) # self.send_chunk(1,0) # self.send_chunk(0,1) # self.send_chunk(1,1) self.send_update_view_position(0,0) def send_join_game(self): # Build up fields for "Join Game" packet entity_id = 0 max_players = 0 hashed_seed = 42 view_distance = self.view_distance simulation_distance = 2 game_mode = 3 prev_game_mode = 3 is_hardcore = False is_reduced_debug = False is_respawn_screen = False is_debug = False is_flat = True dimension_codec = data_packs[self.protocol_version] dimension_name = "minecraft:overworld" dimension_tag = dimension_types[self.protocol_version, dimension_name] world_count = 1 world_name = "favorite_place" join_game = [ self.buff_type.pack("i?Bb", entity_id, is_hardcore, game_mode, prev_game_mode), self.buff_type.pack_varint(world_count), self.buff_type.pack_string(world_name), self.buff_type.pack_nbt(dimension_codec), ] join_game.append(self.buff_type.pack_nbt(dimension_tag)) join_game.append(self.buff_type.pack_string(world_name)) join_game.append(self.buff_type.pack("q", hashed_seed)) join_game.append(self.buff_type.pack_varint(max_players)) join_game.append(self.buff_type.pack_varint(view_distance)) join_game.append(self.buff_type.pack_varint(simulation_distance)) join_game.append(self.buff_type.pack("????", is_reduced_debug, is_respawn_screen, is_debug, is_flat)) # Send "Join Game" packet self.send_packet("join_game", *join_game) def manage_queue(self): if self.closed: return t = time.time() if self.player._load_queue: x,z = self.player._load_queue.pop(0) while (x,z) in self.player._load_queue: self.player._load_queue.remove((x,z)) # print(f"dequeuing loading chunk {x}, {z}") self.send_chunk(x,z) print(f"generating {x}, {z} took {int(time.time()-t)}s; {len(self.player._load_queue)} chunks remaining") while self.player._unload_queue: x,z = self.player._unload_queue.pop(0) while (x,z) in self.player._unload_queue: self.player._unload_queue.remove((x,z)) print(f"dequeuing unloading chunk {x}, {z}") self.send_unload_chunk(x,z) def player_left(self): ServerProtocol.player_left(self) # Announce player leave to other players self.factory.broadcast_player_leave(self) def update_keep_alive(self): # Send a "Keep Alive" packet self.send_packet("keep_alive", self.buff_type.pack('Q', 0)) def packet_chat_message(self, buff): if self.protocol_mode != 'play': return message = buff.unpack_string() self.factory.broadcast_chat(message, self.uuid, self.display_name) buff.discard() def send_update_view_position(self, x, z): self.send_packet( 'update_view_position', self.buff_type.pack_varint(x), self.buff_type.pack_varint(z), ) def packet_player_position(self, buff: Buffer_1_18_2): self.player.update_pos(*buff.unpack('ddd')) buff.discard() def send_block_change(self, x, y, z, block): self.send_packet( 'block_change', self.buff_type.pack_position(x,y,z), self.buff_type.pack_varint(REGISTRY.encode_block(block)), ) def send_unload_chunk(self, x, z): self.send_packet( 'unload_chunk', self.buff_type.pack('ii', x, z), ) # def send_chunk(self, x, z, full, heightmap, sections, biomes): def send_chunk(self, x, z): sections, heightmap = self.factory.generate(x,z) sections_data = self.buff_type.pack_chunk(sections) self.send_unload_chunk(x,z) self.send_packet( 'chunk_data', self.buff_type.pack('ii', x, z), self.buff_type.pack_nbt(heightmap), self.buff_type.pack_varint(len(sections_data)), sections_data, self.buff_type.pack_varint(0), # Always zero block entities self.buff_type.pack('?', True), self.buff_type.pack_varint(0), # Sky Light Mask self.buff_type.pack_varint(0), # Block Light Mask self.buff_type.pack_varint(0), # Empty Sky Light Mask self.buff_type.pack_varint(0), # Empty Block Light Mask self.buff_type.pack_varint(0), # Sky Light Array Length self.buff_type.pack_varint(0), # Block Light Array Length ) for i in self.factory.fix(x,z): self.send_block_change(*i,BLOCK_EMPTY) class YTDServerFactory(ServerFactory): protocol = YTDServerProtocol motd = "YTD Custom Server (WIP)" force_protocol_version = 758 # 1.18.2 # Sends an unsigned chat message, using system messages on supporting clients def broadcast_chat(self, message: str, sender: UUID, sender_name: str): for player in self.players: if player.protocol_mode != 'play': continue self.send_chat(player, message, sender, sender_name) def send_chat(self, player: YTDServerProtocol, message: str, sender: UUID, sender_name: str): player.send_packet("chat_message", player.buff_type.pack_chat("<%s> %s" % (sender_name, message)), player.buff_type.pack('B', 0), player.buff_type.pack_uuid(sender)) # Sends a system message, falling back to chat messages on older clients def broadcast_system(self, message: str): for player in self.players: if player.protocol_mode != 'play': continue self.send_system(player, message) @staticmethod def send_system(player: YTDServerProtocol, message: str): player.send_packet("chat_message", player.buff_type.pack_chat(message), player.buff_type.pack('B', 0), player.buff_type.pack_uuid(UUID(int=0))) # Announces player join def broadcast_player_join(self, joined: YTDServerProtocol): self.broadcast_system("\u00a7e%s has joined." % joined.display_name) self.broadcast_player_list_add(joined) # Announces player leave def broadcast_player_leave(self, left: YTDServerProtocol): self.broadcast_system("\u00a7e%s has left." % left.display_name) self.broadcast_player_list_remove(left) # Sends player list entry for new player to other players def broadcast_player_list_add(self, added: YTDServerProtocol): for player in self.players: # Exclude the added player, they will be sent the full player list separately if player.protocol_mode == 'play' and player != added: self.send_player_list_add(player, [added]) @staticmethod def send_player_list_add(player: YTDServerProtocol, added: List[YTDServerProtocol]): data = [ player.buff_type.pack_varint(0), # Action - 0 = Player add player.buff_type.pack_varint(len(added)), # Player entry count ] for entry in added: if entry.protocol_mode != 'play': continue data.append(player.buff_type.pack_uuid(entry.uuid)) # Player UUID data.append(player.buff_type.pack_string(entry.display_name)) # Player name data.append(player.buff_type.pack_varint(0)) # Empty properties list data.append(player.buff_type.pack_varint(3)) # Gamemode data.append(player.buff_type.pack_varint(0)) # Latency data.append(player.buff_type.pack('?', False)) # No display name player.send_packet('player_list_item', *data) # Sends player list update for leaving player to other players def broadcast_player_list_remove(self, removed: YTDServerProtocol): for player in self.players: if player.protocol_mode == 'play' and player != removed: player.send_packet('player_list_item', player.buff_type.pack_varint(4), # Action - 4 = Player remove player.buff_type.pack_varint(1), # Player entry count player.buff_type.pack_uuid(removed.uuid)) # Player UUID @staticmethod def chunk_func(cx, cy, cz, xi, yi, zi): x = cx * 16 + xi y = cy * 16 + yi z = cz * 16 + zi return YTDServerFactory.func(x,y,z) @staticmethod def func(x, y, z): a = 100 b = 100 if x*x/a - z*z/b > y + 3: return BLOCK_UNDERGROUND elif x*x/a - z*z/b > y + 1: return BLOCK_SUBSURFACE elif x*x/a - z*z/b > y: return BLOCK_SURFACE else: return BLOCK_EMPTY def fix(self, x, z): blocks = [] for y in range(-64,325,16): if not self.func(x*16,y,z*16): blocks.append((x*16,y,z*16)) return blocks def generate(self, x, z): # array[y][z][x] array = lambda x, y, z: (y * 16 * 16) + (z * 16) + x chunk_range = lambda: [(array(x,y,z), x,y,z) for x in range(16) for y in range(16) for z in range(16)] sections = [] for _ in range(CHUNKS_TALL): sections.append(BlockArray.empty(REGISTRY)) for y, sec in enumerate(sections): sec[0] = BLOCK_DEFAULT for i, xi, yi, zi in chunk_range(): sec[i] = self.chunk_func(x,y-12,z,xi,yi,zi) # if self.chunk_func(x,y-4,z,xi,yi,zi): # print(x * 16 + xi,(y-4) * 16 + yi,z * 16 + zi) # sec[i] = BLOCK_DEFAULT # else: # sec[i] = BLOCK_EMPTY height_map = NBT.TagRoot({'':NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})}) return sections, height_map def main(argv): # Parse options import argparse parser = argparse.ArgumentParser() parser.add_argument("-a", "--host", default="", help="address to listen on") parser.add_argument("-p", "--port", default=25565, type=int, help="port to listen on") parser.add_argument("--offline", action="store_true", help="offline server") args = parser.parse_args(argv) # Create factory factory = YTDServerFactory() factory.online_mode = False#not args.offline # Listen factory.listen(args.host, args.port) reactor.run() if __name__ == "__main__": import sys main(sys.argv[1:])