favorite-place/server/main.py

558 lines
19 KiB
Python

# Yeahbut Apr 2024
import math
import certifi, os
os.environ["SSL_CERT_FILE"] = certifi.where()
# Modified from https://raw.githubusercontent.com/barneygale/quarry/master/examples/server_chat_room_advanced.py
from typing import List
import subprocess
import time
from twisted.internet import reactor
from quarry.net.server import ServerFactory, ServerProtocol
from quarry.types.uuid import UUID
from quarry.types.chunk import BlockArray, PackedArray
from quarry.types.registry import LookupRegistry
from quarry.types.buffer.v1_14 import Buffer1_14
import quarry.types.nbt as NBT
from quarry.data.data_packs import data_packs, dimension_types
def is_java_17_installed():
try:
result = subprocess.run(['java', '-version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
return result.returncode == 0# and "java version \"17" in result.stdout
except FileNotFoundError:
return False
def bootstrap_server_data():
import requests
if not is_java_17_installed():
# winget install Microsoft.OpenJDK.17
print("Java not found, please install Java 17 (or newer).")
return
url = "https://piston-data.mojang.com/v1/objects/c8f83c5655308435b3dcf03c06d9fe8740a77469/server.jar"
command = ["java", "-DbundlerMainClass=net.minecraft.data.Main", "-jar", "minecraft_server.jar", "--reports"]
r = requests.get(url)
os.makedirs("generate_data")
with open(r'generate_data\minecraft_server.jar', 'wb') as file:
file.write(r.content)
process = subprocess.Popen(command, shell=True, cwd="generate_data")
process.wait()
BIOME_ID = 7 # Forest Data ID
# java -DbundlerMainClass=net.minecraft.data.Main -jar minecraft_server.jar --reports
if not os.path.exists("generate_data"): bootstrap_server_data()
REGISTRY = LookupRegistry.from_json(r'generate_data\generated\reports')
CHUNKS_TALL = 28
BLOCK_EMPTY = {'name': 'minecraft:air'}
BLOCK_SURFACE = {'name': 'minecraft:grass_block', 'snowy': 'false'}
BLOCK_SUBSURFACE = {'name': 'minecraft:dirt'}
BLOCK_UNDERGROUND = {'name': 'minecraft:stone'}
BLOCK_DEFAULT = {'name': 'minecraft:cobblestone'}
class Buffer_1_18_2(Buffer1_14):
@classmethod
def pack_chunk(cls, sections):
data = b""
for section in sections:
if section and not section.is_empty():
data += cls.pack_chunk_section(section)
return data
@classmethod
def pack_chunk_section(cls, blocks, block_lights=None, sky_lights=None):
"""
Packs a chunk section. The supplied argument should be an instance of
``quarry.types.chunk.BlockArray``.
"""
out = cls.pack('HB', blocks.non_air, blocks.storage.value_width)
out += cls.pack_chunk_section_palette(blocks.palette)
out += cls.pack_chunk_section_array(blocks.to_bytes())
out += cls.pack('B', 0) # Biome Bits Per Entry
out += cls.pack_varint(BIOME_ID) # Biome Palette
out += cls.pack_varint(0) # Biome Data Array Length
return out
def unpack_chunk(self, bitmask, overworld=True):
raise NotImplementedError
def unpack_chunk_section(self, overworld=True):
raise NotImplementedError
class Player:
_conn: "YTDServerProtocol"
_name: str
_x: float
_y: float
_z: float
_yaw: float
_pitch: float
_chunks: set[tuple[int,int]]
_load_queue: list[tuple[int,int]]
_unload_queue: list[tuple[int,int]]
def __init__(
self,
connection: "YTDServerProtocol",
name: str,
x: float,
y: float,
z: float,
yaw: float,
pitch: float,
):
self._conn = connection
self._name = name
self._x = x
self._y = y
self._z = z
self._yaw = yaw
self._pitch = pitch
self._chunks = set()
self._load_queue = list()
self._unload_queue = list()
@property
def name(self) -> str:
return self._name
@property
def x(self) -> float:
return self._x
@property
def y(self) -> float:
return self._y
@property
def z(self) -> float:
return self._z
@property
def yaw(self) -> float:
return self._yaw
@yaw.setter
def yaw(self, value: float):
self._yaw = value
@property
def pitch(self) -> float:
return self._pitch
@pitch.setter
def pitch(self, value: float):
self._pitch = value
@property
def cx(self) -> int:
return int(self.x // 16)
@property
def cy(self) -> int:
return int(self.y // 16)
@property
def cz(self) -> int:
return int(self.z // 16)
def update_pos(self, x: float, y: float, z: float):
chunk_crossed = int(x // 16) != self.cx or int(z // 16) != self.cz
self._x = x
self._y = y
self._z = z
if chunk_crossed:
self.update_chunks()
def update_chunks(self):
print("Chunk Crossed")
vd = self._conn.view_distance
chunks: set[tuple[int,int]] = set()
for x in range(self.cx-vd, self.cx+vd+1):
for z in range(self.cz-vd, self.cz+vd+1):
chunks.add((x,z))
new_chunks = chunks.difference(self._chunks)
old_chunks = self._chunks.difference(chunks)
for i in new_chunks:
self.load_chunk(*i)
for i in old_chunks:
self.unload_chunk(*i)
self._load_queue.sort(key=lambda o: math.sqrt(pow(self.cx - o[0],2) + pow(self.cz - o[1],2)))
self._conn.send_update_view_position(self.cx,self.cz)
def load_chunk(self, x: int, z: int):
print(f"enqueuing loading chunk {x}, {z}")
self._chunks.add((x,z))
if (x,z) not in self._load_queue:
self._load_queue.append((x,z))
if (x,z) in self._unload_queue:
self._unload_queue.remove((x,z))
# self._conn.send_chunk(x,z)
def unload_chunk(self, x: int, z: int):
print(f"enqueuing unloading chunk {x}, {z}")
try: self._chunks.remove((x,z))
except KeyError: pass
if (x,z) not in self._unload_queue:
self._unload_queue.append((x,z))
if (x,z) in self._load_queue:
self._load_queue.remove((x,z))
# self._conn.send_unload_chunk(x,z)
class YTDServerProtocol(ServerProtocol):
def player_joined(self):
# Call super. This switches us to "play" mode, marks the player as
# in-game, and does some logging.
ServerProtocol.player_joined(self)
self.buff_type = Buffer_1_18_2
self.view_distance = 8
self.player = Player(
self,
self.display_name, # type: ignore
0,
128, #325,
0,
0,
0,
)
# Send join game packet
self.send_join_game()
# Send "Player Position and Look" packet
self.send_packet(
"player_position_and_look",
self.buff_type.pack("dddff?",
self.player.x,
self.player.y,
self.player.z,
self.player.yaw,
self.player.pitch,
0b00000),
self.buff_type.pack_varint(0),
self.buff_type.pack("?", True))
# Start sending "Keep Alive" packets
self.ticker.add_loop(20, self.update_keep_alive)
# self.ticker.add_loop(.25/self.ticker.interval, self.manage_queue)
self.ticker.add_loop(1, self.manage_queue)
# Announce player join to other players
self.factory.broadcast_player_join(self)
# Send full player list
self.factory.send_player_list_add(self, self.factory.players)
self.send_update_view_position(0,0)
self.player.load_chunk(0,0)
self.player.load_chunk(-1,0)
self.player.load_chunk(0,-1)
self.player.load_chunk(-1,-1)
# self.send_chunk(0,0)
# self.send_chunk(-1,0)
# self.send_chunk(0,-1)
# self.send_chunk(-1,-1)
# self.send_chunk(1,0)
# self.send_chunk(0,1)
# self.send_chunk(1,1)
self.send_update_view_position(0,0)
# self.send_time_update(0,-6000)
def send_join_game(self):
# Build up fields for "Join Game" packet
entity_id = 0
max_players = 0
hashed_seed = 42
view_distance = self.view_distance
simulation_distance = 2
game_mode = 3
prev_game_mode = 3
is_hardcore = False
is_reduced_debug = False
is_respawn_screen = False
is_debug = False
is_flat = True
dimension_codec = data_packs[self.protocol_version]
dimension_name = "minecraft:overworld"
dimension_tag = dimension_types[self.protocol_version, dimension_name]
world_count = 1
world_name = "favorite_place"
join_game = [
self.buff_type.pack("i?Bb", entity_id, is_hardcore, game_mode, prev_game_mode),
self.buff_type.pack_varint(world_count),
self.buff_type.pack_string(world_name),
self.buff_type.pack_nbt(dimension_codec),
]
join_game.append(self.buff_type.pack_nbt(dimension_tag))
join_game.append(self.buff_type.pack_string(world_name))
join_game.append(self.buff_type.pack("q", hashed_seed))
join_game.append(self.buff_type.pack_varint(max_players))
join_game.append(self.buff_type.pack_varint(view_distance))
join_game.append(self.buff_type.pack_varint(simulation_distance))
join_game.append(self.buff_type.pack("????", is_reduced_debug, is_respawn_screen, is_debug, is_flat))
# Send "Join Game" packet
self.send_packet("join_game", *join_game)
def manage_queue(self):
if self.closed: return
t = time.time()
if self.player._load_queue:
x,z = self.player._load_queue.pop(0)
while (x,z) in self.player._load_queue:
self.player._load_queue.remove((x,z))
# print(f"dequeuing loading chunk {x}, {z}")
self.send_chunk(x,z)
print(f"generating {x}, {z} took {int(time.time()-t)}s; {len(self.player._load_queue)} chunks remaining")
while self.player._unload_queue:
x,z = self.player._unload_queue.pop(0)
while (x,z) in self.player._unload_queue:
self.player._unload_queue.remove((x,z))
print(f"dequeuing unloading chunk {x}, {z}")
self.send_unload_chunk(x,z)
def player_left(self):
ServerProtocol.player_left(self)
# Announce player leave to other players
self.factory.broadcast_player_leave(self)
def update_keep_alive(self):
# Send a "Keep Alive" packet
self.send_packet("keep_alive", self.buff_type.pack('Q', 0))
def packet_chat_message(self, buff):
if self.protocol_mode != 'play':
return
message = buff.unpack_string()
self.factory.broadcast_chat(message, self.uuid, self.display_name)
buff.discard()
def send_update_view_position(self, x, z):
self.send_packet(
'update_view_position',
self.buff_type.pack_varint(x),
self.buff_type.pack_varint(z),
)
def send_time_update(self, server_age, world_time):
self.send_packet(
'time_update',
self.buff_type.pack('l', server_age),
self.buff_type.pack('l', world_time),
)
def packet_player_position(self, buff: Buffer_1_18_2):
self.player.update_pos(*buff.unpack('ddd'))
buff.discard()
def send_block_change(self, x, y, z, block):
self.send_packet(
'block_change',
self.buff_type.pack_position(x,y,z),
self.buff_type.pack_varint(REGISTRY.encode_block(block)),
)
def send_unload_chunk(self, x, z):
self.send_packet(
'unload_chunk',
self.buff_type.pack('ii', x, z),
)
# def send_chunk(self, x, z, full, heightmap, sections, biomes):
def send_chunk(self, x, z):
sections, heightmap = self.factory.generate(x,z)
sections_data = self.buff_type.pack_chunk(sections)
self.send_unload_chunk(x,z)
self.send_packet(
'chunk_data',
self.buff_type.pack('ii', x, z),
self.buff_type.pack_nbt(heightmap),
self.buff_type.pack_varint(len(sections_data)),
sections_data,
self.buff_type.pack_varint(0), # Always zero block entities
self.buff_type.pack('?', True),
self.buff_type.pack_varint(0), # Sky Light Mask
self.buff_type.pack_varint(0), # Block Light Mask
self.buff_type.pack_varint(0), # Empty Sky Light Mask
self.buff_type.pack_varint(0), # Empty Block Light Mask
self.buff_type.pack_varint(0), # Sky Light Array Length
self.buff_type.pack_varint(0), # Block Light Array Length
)
for i in self.factory.fix(x,z):
self.send_block_change(*i,BLOCK_EMPTY)
class YTDServerFactory(ServerFactory):
protocol = YTDServerProtocol
motd = "YTD Custom Server (WIP)"
force_protocol_version = 758 # 1.18.2
# Sends an unsigned chat message, using system messages on supporting clients
def broadcast_chat(self, message: str, sender: UUID, sender_name: str):
for player in self.players:
if player.protocol_mode != 'play':
continue
self.send_chat(player, message, sender, sender_name)
def send_chat(self, player: YTDServerProtocol, message: str, sender: UUID, sender_name: str):
player.send_packet("chat_message",
player.buff_type.pack_chat("<%s> %s" % (sender_name, message)),
player.buff_type.pack('B', 0),
player.buff_type.pack_uuid(sender))
# Sends a system message, falling back to chat messages on older clients
def broadcast_system(self, message: str):
for player in self.players:
if player.protocol_mode != 'play':
continue
self.send_system(player, message)
@staticmethod
def send_system(player: YTDServerProtocol, message: str):
player.send_packet("chat_message",
player.buff_type.pack_chat(message),
player.buff_type.pack('B', 0),
player.buff_type.pack_uuid(UUID(int=0)))
# Announces player join
def broadcast_player_join(self, joined: YTDServerProtocol):
self.broadcast_system("\u00a7e%s has joined." % joined.display_name)
self.broadcast_player_list_add(joined)
# Announces player leave
def broadcast_player_leave(self, left: YTDServerProtocol):
self.broadcast_system("\u00a7e%s has left." % left.display_name)
self.broadcast_player_list_remove(left)
# Sends player list entry for new player to other players
def broadcast_player_list_add(self, added: YTDServerProtocol):
for player in self.players:
# Exclude the added player, they will be sent the full player list separately
if player.protocol_mode == 'play' and player != added:
self.send_player_list_add(player, [added])
@staticmethod
def send_player_list_add(player: YTDServerProtocol, added: List[YTDServerProtocol]):
data = [
player.buff_type.pack_varint(0), # Action - 0 = Player add
player.buff_type.pack_varint(len(added)), # Player entry count
]
for entry in added:
if entry.protocol_mode != 'play':
continue
data.append(player.buff_type.pack_uuid(entry.uuid)) # Player UUID
data.append(player.buff_type.pack_string(entry.display_name)) # Player name
data.append(player.buff_type.pack_varint(0)) # Empty properties list
data.append(player.buff_type.pack_varint(3)) # Gamemode
data.append(player.buff_type.pack_varint(0)) # Latency
data.append(player.buff_type.pack('?', False)) # No display name
player.send_packet('player_list_item', *data)
# Sends player list update for leaving player to other players
def broadcast_player_list_remove(self, removed: YTDServerProtocol):
for player in self.players:
if player.protocol_mode == 'play' and player != removed:
player.send_packet('player_list_item',
player.buff_type.pack_varint(4), # Action - 4 = Player remove
player.buff_type.pack_varint(1), # Player entry count
player.buff_type.pack_uuid(removed.uuid)) # Player UUID
@staticmethod
def chunk_func(cx, cy, cz, xi, yi, zi):
x = cx * 16 + xi
y = cy * 16 + yi
z = cz * 16 + zi
return YTDServerFactory.func(x,y,z)
@staticmethod
# This function implements the equation defining the terrain.
def func(x: int, y: int, z: int):
a = 100
b = 100
if x*x/a - z*z/b > y + 3:
return BLOCK_UNDERGROUND
elif x*x/a - z*z/b > y + 1:
return BLOCK_SUBSURFACE
elif x*x/a - z*z/b > y:
return BLOCK_SURFACE
else:
return BLOCK_EMPTY
def fix(self, x, z):
blocks = []
for y in range(-64,325,16):
if self.func(x*16,y,z*16) != BLOCK_EMPTY:
blocks.append((x*16,y,z*16))
return blocks
def generate(self, x, z):
# array[y][z][x]
array = lambda x, y, z: (y * 16 * 16) + (z * 16) + x
chunk_range = lambda: [(array(x,y,z), x,y,z) for x in range(16) for y in range(16) for z in range(16)]
sections = []
for _ in range(CHUNKS_TALL):
sections.append(BlockArray.empty(REGISTRY))
for y, sec in enumerate(sections):
sec[0] = BLOCK_DEFAULT
for i, xi, yi, zi in chunk_range():
sec[i] = self.chunk_func(x,y-12,z,xi,yi,zi)
# if self.chunk_func(x,y-4,z,xi,yi,zi):
# print(x * 16 + xi,(y-4) * 16 + yi,z * 16 + zi)
# sec[i] = BLOCK_DEFAULT
# else:
# sec[i] = BLOCK_EMPTY
height_map = NBT.TagRoot({'':NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})})
return sections, height_map
def main(argv):
# Parse options
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--host", default="", help="address to listen on")
parser.add_argument("-p", "--port", default=25565, type=int, help="port to listen on")
parser.add_argument("--offline", action="store_true", help="offline server")
args = parser.parse_args(argv)
# Create factory
factory = YTDServerFactory()
factory.online_mode = False#not args.offline
# Listen
factory.listen(args.host, args.port)
reactor.run()
if __name__ == "__main__":
import sys
main(sys.argv[1:])