favorite-place/server/main.py

256 lines
9.7 KiB
Python

# Yeahbut Apr 2024
import certifi, os
os.environ["SSL_CERT_FILE"] = certifi.where()
# Modified from https://raw.githubusercontent.com/barneygale/quarry/master/examples/server_chat_room_advanced.py
from typing import List
from twisted.internet import reactor
from quarry.net.server import ServerFactory, ServerProtocol
from quarry.types.uuid import UUID
from quarry.types.chunk import BlockArray, PackedArray
from quarry.types.registry import LookupRegistry
import quarry.types.nbt as NBT
from quarry.data.data_packs import data_packs, dimension_types
FOREST_BIOME_DATA_PACK_ID = 7
# java -DbundlerMainClass=net.minecraft.data.Main -jar minecraft_server.jar --reports
REGISTRY = LookupRegistry.from_json(r'generate_data\generated\reports')
CHUNKS_TALL = 28
class YTDServerProtocol(ServerProtocol):
def player_joined(self):
# Call super. This switches us to "play" mode, marks the player as
# in-game, and does some logging.
ServerProtocol.player_joined(self)
# Send join game packet
self.send_join_game()
# Send "Player Position and Look" packet
self.send_packet(
"player_position_and_look",
self.buff_type.pack("dddff?",
0, # x
500, # y Must be >= build height to pass the "Loading Terrain" screen on 1.18.2
0, # z
0, # yaw
0, # pitch
0b00000), # flags
self.buff_type.pack_varint(0), # teleport id
self.buff_type.pack("?", True)) # Leave vehicle
# Start sending "Keep Alive" packets
self.ticker.add_loop(20, self.update_keep_alive)
# Announce player join to other players
self.factory.broadcast_player_join(self)
# Send full player list
self.factory.send_player_list_add(self, self.factory.players)
self.send_chunk(0,0)
def send_join_game(self):
# Build up fields for "Join Game" packet
entity_id = 0
max_players = 0
hashed_seed = 42
view_distance = 2
simulation_distance = 2
game_mode = 3
prev_game_mode = 3
is_hardcore = False
is_reduced_debug = False
is_respawn_screen = False
is_debug = False
is_flat = True
dimension_codec = data_packs[self.protocol_version]
dimension_name = "minecraft:overworld"
dimension_tag = dimension_types[self.protocol_version, dimension_name]
world_count = 1
world_name = "favorite_place"
join_game = [
self.buff_type.pack("i?Bb", entity_id, is_hardcore, game_mode, prev_game_mode),
self.buff_type.pack_varint(world_count),
self.buff_type.pack_string(world_name),
self.buff_type.pack_nbt(dimension_codec),
]
join_game.append(self.buff_type.pack_nbt(dimension_tag))
join_game.append(self.buff_type.pack_string(world_name))
join_game.append(self.buff_type.pack("q", hashed_seed))
join_game.append(self.buff_type.pack_varint(max_players))
join_game.append(self.buff_type.pack_varint(view_distance)),
join_game.append(self.buff_type.pack_varint(simulation_distance))
join_game.append(self.buff_type.pack("????", is_reduced_debug, is_respawn_screen, is_debug, is_flat))
# Send "Join Game" packet
self.send_packet("join_game", *join_game)
def player_left(self):
ServerProtocol.player_left(self)
# Announce player leave to other players
self.factory.broadcast_player_leave(self)
def update_keep_alive(self):
# Send a "Keep Alive" packet
self.send_packet("keep_alive", self.buff_type.pack('Q', 0))
def packet_chat_message(self, buff):
if self.protocol_mode != 'play':
return
message = buff.unpack_string()
self.factory.broadcast_chat(message, self.uuid, self.display_name)
buff.discard()
# def send_chunk(self, x, z, full, heightmap, sections, biomes):
def send_chunk(self, x, z):
sections, heightmap = self.factory.generate(x,z)
sections_data = self.buff_type.pack_chunk(sections)
self.send_packet(
'unload_chunk',
self.buff_type.pack('ii', x, z),
)
self.send_packet(
'chunk_data',
self.buff_type.pack('ii', x, z),
# # self.buff_type.pack('ii?', x, z, full),
# # self.buff_type.pack_chunk_bitmask(sections),
self.buff_type.pack_nbt(heightmap),
# # self.buff_type.pack_array('I', biomes) if full else b'',
self.buff_type.pack_varint(len(sections_data)),
sections_data,
self.buff_type.pack_varint(0), # Always zero block entities
)
class YTDServerFactory(ServerFactory):
protocol = YTDServerProtocol
motd = "YTD Custom Server (WIP)"
force_protocol_version = 758 # 1.18.2
# Sends an unsigned chat message, using system messages on supporting clients
def broadcast_chat(self, message: str, sender: UUID, sender_name: str):
for player in self.players:
if player.protocol_mode != 'play':
continue
self.send_chat(player, message, sender, sender_name)
def send_chat(self, player: YTDServerProtocol, message: str, sender: UUID, sender_name: str):
player.send_packet("chat_message",
player.buff_type.pack_chat("<%s> %s" % (sender_name, message)),
player.buff_type.pack('B', 0),
player.buff_type.pack_uuid(sender))
# Sends a system message, falling back to chat messages on older clients
def broadcast_system(self, message: str):
for player in self.players:
if player.protocol_mode != 'play':
continue
self.send_system(player, message)
@staticmethod
def send_system(player: YTDServerProtocol, message: str):
player.send_packet("chat_message",
player.buff_type.pack_chat(message),
player.buff_type.pack('B', 0),
player.buff_type.pack_uuid(UUID(int=0)))
# Announces player join
def broadcast_player_join(self, joined: YTDServerProtocol):
self.broadcast_system("\u00a7e%s has joined." % joined.display_name)
self.broadcast_player_list_add(joined)
# Announces player leave
def broadcast_player_leave(self, left: YTDServerProtocol):
self.broadcast_system("\u00a7e%s has left." % left.display_name)
self.broadcast_player_list_remove(left)
# Sends player list entry for new player to other players
def broadcast_player_list_add(self, added: YTDServerProtocol):
for player in self.players:
# Exclude the added player, they will be sent the full player list separately
if player.protocol_mode == 'play' and player != added:
self.send_player_list_add(player, [added])
@staticmethod
def send_player_list_add(player: YTDServerProtocol, added: List[YTDServerProtocol]):
data = [
player.buff_type.pack_varint(0), # Action - 0 = Player add
player.buff_type.pack_varint(len(added)), # Player entry count
]
for entry in added:
if entry.protocol_mode != 'play':
continue
data.append(player.buff_type.pack_uuid(entry.uuid)) # Player UUID
data.append(player.buff_type.pack_string(entry.display_name)) # Player name
data.append(player.buff_type.pack_varint(0)) # Empty properties list
data.append(player.buff_type.pack_varint(3)) # Gamemode
data.append(player.buff_type.pack_varint(0)) # Latency
data.append(player.buff_type.pack('?', False)) # No display name
player.send_packet('player_list_item', *data)
# Sends player list update for leaving player to other players
def broadcast_player_list_remove(self, removed: YTDServerProtocol):
for player in self.players:
if player.protocol_mode == 'play' and player != removed:
player.send_packet('player_list_item',
player.buff_type.pack_varint(4), # Action - 4 = Player remove
player.buff_type.pack_varint(1), # Player entry count
player.buff_type.pack_uuid(removed.uuid)) # Player UUID
def generate(self, x, z):
sections = []
for _ in range(CHUNKS_TALL):
sections.append((BlockArray.empty(REGISTRY),None,None))
for sec in sections:
sec[0][0] = {'name': 'minecraft:grass_block', 'snowy': 'false'}
# height_map = NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})
# height_map = NBT.TagRoot(NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())}))
# height_map = NBT.TagRoot({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})
height_map = NBT.TagRoot({'':NBT.TagCompound({'MOTION_BLOCKING':NBT.TagLongArray(PackedArray.empty_height())})})
return sections, height_map
def main(argv):
# Parse options
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--host", default="", help="address to listen on")
parser.add_argument("-p", "--port", default=25565, type=int, help="port to listen on")
parser.add_argument("--offline", action="store_true", help="offline server")
args = parser.parse_args(argv)
# Create factory
factory = YTDServerFactory()
factory.online_mode = False#not args.offline
# Listen
factory.listen(args.host, args.port)
reactor.run()
if __name__ == "__main__":
import sys
main(sys.argv[1:])