pgnet.server
Home of the Server
class.
1"""Home of the `Server` class.""" 2 3from loguru import logger 4from typing import Optional, Callable, Type, Any 5import arrow 6import asyncio 7import functools 8import json 9from pathlib import Path 10import inspect 11import re 12import os 13import time 14import websockets 15from dataclasses import dataclass, field 16from websockets.server import WebSocketServerProtocol as ServerWebSocket 17import hashlib 18from .util import ( 19 Packet, 20 Response, 21 DisconnectedError, 22 Key, 23 Connection, 24 Game, 25 DEFAULT_PORT, 26 Request, 27 DEFAULT_ADMIN_PASSWORD, 28 ADMIN_USERNAME, 29 Status, 30) 31 32 33GAME_UPDATE_INTERVAL = 0.1 34AUTOSAVE_INTERVAL = 300 # 5 minutes 35SALT_SIZE = 20 36 37_re_whitespace = re.compile(r"\W") 38_re_non_alnum = re.compile(r"[^a-zA-Z\d\s]") 39_re_start_whitespace = re.compile(r"^\W") 40_re_end_whitespace = re.compile(r"\W$") 41 42 43def _get_packet_handler_params(f: Callable) -> set[str]: 44 return { 45 name: param 46 for name, param in inspect.signature(f).parameters.items() 47 if name not in {"self", "packet"} 48 } 49 50 51def _user_packet_handler(*, admin: bool = False): 52 """Decorator that unpacks a packet payload into keyword arguments. 53 54 Checks that payload keys exist in arguments and values match the annotations. If 55 *admin* is True, will check that the packet is from the admin user. 56 """ 57 def wrapper(f: Callable): 58 params = _get_packet_handler_params(f) 59 for name, param in params.items(): 60 if param.annotation not in {int, str, bool, float}: 61 raise AssertionError( 62 f"{name!r} of {f} must be of JSON-able type," 63 f" instead got: {param.annotation}" 64 ) 65 66 @functools.wraps(f) 67 def inner(server: "Server", packet: Packet): 68 # Check admin 69 if admin and packet.username != ADMIN_USERNAME: 70 return Response( 71 f"{packet.message!r} requires admin privileges.", 72 status=Status.UNEXPECTED, 73 ) 74 # Compare payload to signature 75 for arg, value in packet.payload.items(): 76 if arg not in params: 77 return Response( 78 f"Unexpected argument {arg!r} for request {packet.message!r}", 79 status=Status.UNEXPECTED, 80 ) 81 expected_type = params[arg].annotation 82 if type(value) is not expected_type: 83 m = ( 84 f"Expected argument type {expected_type} for argument {arg!r}." 85 f" Instead got: {type(value)} {value!r}" 86 ) 87 return Response(m, status=Status.UNEXPECTED) 88 # Finally call wrapped function 89 return f(server, packet, **packet.payload) 90 return inner 91 return wrapper 92 93 94def _check_name( 95 name: str, 96 /, 97 *, 98 min_len: int = 3, 99 max_len: int = 20, 100 allow_whitespace: bool = False, 101 alnum_only: bool = True, 102 allow_lead_trail_whitespace: bool = False, 103) -> bool: 104 """If a string matches criteria of arguments.""" 105 if ADMIN_USERNAME.lower() in name.lower(): 106 return False 107 if len(name) > max_len: 108 return False 109 if len(name) < min_len: 110 return False 111 if not allow_whitespace and bool(_re_whitespace.search(name)): 112 return False 113 if alnum_only and bool(_re_non_alnum.search(name)): 114 return False 115 if not allow_lead_trail_whitespace: 116 if bool(_re_start_whitespace.search(name)): 117 return False 118 if bool(_re_end_whitespace.search(name)): 119 return False 120 return True 121 122 123def is_username_allowed(name: str, /) -> bool: 124 """If a username is allowed.""" 125 return _check_name( 126 name, 127 max_len=20, 128 allow_whitespace=False, 129 alnum_only=True, 130 ) 131 132 133def is_gamename_allowed(name: str, /) -> bool: 134 """If a game name is allowed.""" 135 return _check_name( 136 name, 137 max_len=50, 138 allow_whitespace=True, 139 alnum_only=True, 140 ) 141 142 143@dataclass 144class User: 145 """User authentication info.""" 146 147 name: str 148 salt: str 149 hashed_password: str 150 151 @classmethod 152 def from_name_password(cls, name: str, password: str): 153 """Create a new user from a raw (unsalted/unhashed) password.""" 154 salt = cls._generate_salt() 155 hashed_password = cls._hash_password(password, salt) 156 return cls(name, salt=salt, hashed_password=hashed_password) 157 158 def compare_password(self, password: str): 159 """Compare a raw (unsalted/unhashed) password to our password.""" 160 return self._hash_password(password, self.salt) == self.hashed_password 161 162 @staticmethod 163 def _generate_salt() -> str: 164 return os.urandom(SALT_SIZE).hex() 165 166 @staticmethod 167 def _hash_password(password: str, salt: str) -> str: 168 """Hash a string using Python's hashlib.""" 169 return hashlib.sha256(f"{salt}{password}".encode()).hexdigest() 170 171 172@dataclass 173class UserConnection(Connection): 174 """Thin wrapper for `pgnet.util.Connection`. 175 176 Provides serialization and deserialization. 177 """ 178 179 username: Optional[str] = None 180 game: Optional[str] = None 181 182 async def send(self, response: Response, *args, **kwargs): 183 """Override base method to serialize response.""" 184 await super().send(response.serialize(), *args, **kwargs) 185 186 async def recv(self, *args, **kwargs) -> Packet: 187 """Override base method to deserialize packet.""" 188 message = await super().recv(*args, **kwargs) 189 return Packet.deserialize(message) 190 191 192@dataclass 193class LobbyGame: 194 """`pgnet.Game` instance wrapper for management by server.""" 195 196 game: Game = field(repr=False) 197 name: str 198 password: str 199 connected_users: set[str] = field(default_factory=set) 200 201 @property 202 def heartbeat_rate(self) -> float: 203 """Updates per second.""" 204 return self.game.heartbeat_rate 205 206 @property 207 def password_protected(self) -> bool: 208 """If the game is password protected.""" 209 return bool(self.password) 210 211 def add_user(self, username: str, password: str) -> Optional[str]: 212 """Return reason if user was not successfully added.""" 213 if password != self.password: 214 return "Incorrect password." 215 self.connected_users.add(username) 216 self.game.user_joined(username) 217 return None 218 219 def remove_user(self, username): 220 """Remove user from game.""" 221 if username in self.connected_users: 222 self.connected_users.remove(username) 223 self.game.user_left(username) 224 225 def get_save_string(self) -> Optional[str]: 226 """Called by the server when shutting down.""" 227 if self.game.persistent: 228 return self.game.get_save_string() 229 return None 230 231 def get_lobby_info(self) -> str: 232 """Called by the server to get game info.""" 233 return self.game.get_lobby_info() 234 235 @property 236 def expired(self) -> bool: 237 """If the game is empty and not persistent.""" 238 return not self.connected_users and not self.game.persistent 239 240 def handle_packet(self, packet: Packet) -> Response: 241 """Relay packet handling to game instance.""" 242 response = self.game.handle_packet(packet) 243 return response 244 245 def update(self): 246 """Called on an interval by the server.""" 247 self.game.update() 248 249 250class Server: 251 """The server that hosts games. 252 253 Subclass from `pgnet.Game` and pass it as the *`game`* argument for the server. 254 Then, use the `Server.async_run` coroutine to start the server. 255 256 By default, the server is configured to listen on localhost. To listen 257 globally, set *`listen_globally`* and *`admin_password`*. 258 259 For games to save and load, *`save_file`* must be set (see also: 260 `pgnet.Game.get_save_string`). 261 262 .. note:: Most home networks require port forwarding to be discoverable by remote 263 clients. 264 """ 265 266 def __init__( 267 self, 268 game: Type[Game], 269 /, 270 *, 271 listen_globally: bool = False, 272 port: int = DEFAULT_PORT, 273 admin_password: Optional[str] = None, 274 registration_enabled: bool = True, 275 require_user_password: bool = False, 276 on_connection: Optional[Callable[[str, bool], Any]] = None, 277 verbose_logging: bool = False, 278 save_file: Optional[str | Path] = None, 279 ): 280 """Initialize the server. 281 282 Args: 283 listen_globally: Listen globally instead of localhost only. 284 Requires that *`admin_password`* must be set. 285 port: Port number to listen on. 286 admin_password: Password for admin user with elevated priviliges. 287 registration_enabled: Allow new users to register. 288 require_user_password: Require that users have non-empty passwords. 289 on_connection: Callback for when a username connects or disconnects. 290 verbose_logging: Log *all* packets and responses. 291 save_file: Location of file to save and load server sessions. 292 """ 293 if listen_globally and not admin_password: 294 logger.warning( 295 "Created server that listens globally without admin password." 296 ) 297 admin_password = admin_password or DEFAULT_ADMIN_PASSWORD 298 self._key: Key = Key() 299 self._stop: Optional[asyncio.Future] = None 300 self._require_user_password = require_user_password 301 self._users: dict[str, User] = dict() 302 self._register_user(ADMIN_USERNAME, admin_password) 303 self._games: dict[str, LobbyGame] = {} 304 self._connections: dict[str, Optional[UserConnection]] = {} 305 self._deleted_users: set[str] = set() 306 self._invite_codes: dict[str, str] = {} 307 self._game_cls: Type[Game] = game 308 self._save_file: Optional[Path] = None if save_file is None else Path(save_file) 309 self._address: str = "" if listen_globally else "localhost" 310 self._port: int = port 311 self.registration_enabled: bool = registration_enabled 312 self.on_connection: Optional[Callable[[str, bool], Any]] = on_connection 313 self.verbose_logging: bool = verbose_logging 314 self._load_from_disk() 315 logger.debug(f"{self._save_file=}") 316 logger.debug(f"{self._game_cls=}") 317 logger.debug(f"{self._key=}") 318 print(f"{admin_password=}") # Use print instead of logger for password 319 320 async def async_run(self, *, on_start: Optional[Callable] = None) -> int: 321 """Start the server. 322 323 The server will listen for connections and pass them off to the 324 connection handler. 325 326 Args: 327 on_start: Callback for when the server is online and handling messages. 328 329 Returns: 330 Exit code as given by the `shutdown` command. A value of -1 indicates a 331 request to reboot. 332 """ 333 if self._stop: 334 raise RuntimeError("Cannot run the server more than once concurrently.") 335 self._stop = asyncio.Future() 336 serving_args = (self._connection_handler, self._address, self._port) 337 try: 338 async with websockets.serve(*serving_args): 339 logger.info(f"Handling messages {self}") 340 if on_start: 341 on_start() 342 await self._listening_loop(self._stop) 343 except OSError as e: 344 added = OSError(f"Server fail. Perhaps one is already running? {self}") 345 raise added from e 346 self._save_to_disk() 347 result = self._stop.result() 348 logger.info(f"Server stop {result=} {self}") 349 self._stop = None 350 return result 351 352 def shutdown(self, result: int = 0, /): 353 """Stop the server. 354 355 The *result* is passed as the return value (exit code) for `Server.async_run`. 356 """ 357 if self._stop and not self._stop.done(): 358 self._stop.set_result(result) 359 360 def delete_user(self, username: str): 361 """Disconnect and delete a given username from the server.""" 362 if username == ADMIN_USERNAME: 363 logger.warning("Cannot delete admin.") 364 return 365 if username in self._connections: 366 self._deleted_users.add(username) 367 else: 368 self._delete_user(username) 369 370 @property 371 def pubkey(self) -> str: 372 """Public key used for end to end encryption.""" 373 return self._key.pubkey 374 375 async def _listening_loop(self, stop_future: asyncio.Future): 376 next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL) 377 next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL) 378 while not stop_future.done(): 379 await asyncio.sleep(0.1) 380 if arrow.now() >= next_autosave: 381 self._save_to_disk() 382 next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL) 383 if arrow.now() >= next_interval: 384 for game in self._games.values(): 385 game.update() 386 next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL) 387 388 async def _connection_handler(self, websocket: ServerWebSocket): 389 """Handle new connections. 390 391 Allows the handshake to fully populate a UserConnection, which may then 392 be handled as a logged in user. 393 """ 394 connection = UserConnection(websocket) 395 logger.info(f"New connection: {connection}") 396 try: 397 await self._handle_handshake(connection) 398 self._add_user_connection(connection) 399 logger.info(f"User logged in: {connection}") 400 await self._handle_user(connection) 401 except DisconnectedError as e: 402 logger.debug(f"{e=}") 403 finally: 404 logger.info(f"Closed connection: {connection}") 405 self._remove_user_connection(connection) 406 username = connection.username 407 if username in self._users and username in self._deleted_users: 408 self._delete_user(connection.username) 409 410 async def _handle_handshake(self, connection: UserConnection): 411 """Handle a new connection's handshake sequence. Modifies the connection object. 412 413 First trade public keys and assign the connection's `tunnel`. Then authenticate 414 and assign the connection's `username`. 415 """ 416 # Trade public keys 417 packet = await connection.recv() 418 pubkey = packet.payload.get("pubkey") 419 if not pubkey or not isinstance(pubkey, str): 420 response = Response( 421 "Missing public key string.", 422 status=Status.BAD, 423 disconnecting=True, 424 ) 425 await connection.send(response) 426 raise DisconnectedError("Incompatible protocol: missing pubkey.") 427 response = Response("key_trade", dict(pubkey=self.pubkey)) 428 await connection.send(response) 429 connection.tunnel = self._key.get_tunnel(pubkey) 430 logger.debug(f"Assigned tunnel: {connection}") 431 # Authenticate 432 packet = await connection.recv() 433 username = packet.payload.get("username") 434 password = packet.payload.get("password", "") 435 invite_code = packet.payload.get("invite_code", "") 436 fail = self._check_auth(username, password, invite_code) 437 if fail: 438 # Respond with problem and disconnect 439 response = Response(fail, status=Status.BAD, disconnecting=True) 440 await connection.send(response) 441 raise DisconnectedError("Failed to authenticate.") 442 connection.username = username 443 logger.debug(f"Assigned username: {connection}") 444 if username == ADMIN_USERNAME: 445 logger.warning(f"Authenticated as admin: {connection}") 446 await connection.send(Response("Authenticated.")) 447 448 def _check_auth( 449 self, 450 username: str, 451 password: str, 452 invite_code: str, 453 ) -> Optional[str]: 454 """Return failure reason or None.""" 455 if not username: 456 return "Missing non-empty username." 457 if username in self._deleted_users: 458 return "User deleted." 459 if username in self._connections: 460 return "Username already connected." 461 if username not in self._users: 462 return self._try_register_user(username, password, invite_code) 463 user = self._users[username] 464 if not user.compare_password(password): 465 return "Incorrect password." 466 return None 467 468 def _try_register_user( 469 self, 470 username: str, 471 password: str, 472 invite_code: str, 473 ) -> Optional[str]: 474 """Return failure reason or None.""" 475 if invite_code: 476 invite_username = self._invite_codes.get(invite_code) 477 wrong_code = invite_username is None 478 invite_valid = username == invite_username or invite_username == "" 479 if wrong_code or not invite_valid: 480 return "Incorrect username or invite code." 481 if not (self.registration_enabled or invite_code): 482 return "Registration blocked." 483 if self._require_user_password and not password: 484 return "User password required." 485 if not is_username_allowed(username): 486 return "Username not allowed." 487 self._register_user(username, password) 488 if invite_code: 489 del self._invite_codes[invite_code] 490 return None 491 492 def _register_user(self, username: str, password: str, /): 493 """Register new user.""" 494 assert username not in self._users 495 if self._require_user_password and not password: 496 raise ValueError("Server requires password for users.") 497 user = User.from_name_password(username, password) 498 self._users[username] = user 499 logger.info(f"Registered {username=}") 500 501 def _add_user_connection(self, connection: UserConnection): 502 """Add the connection to connected users table.""" 503 username = connection.username 504 assert username not in self._connections 505 self._connections[username] = connection 506 if self.on_connection: 507 self.on_connection(username, True) 508 509 def _remove_user_connection(self, connection: UserConnection): 510 """Remove the connection from connected users table if exists.""" 511 username = connection.username 512 if username not in self._connections: 513 return 514 self._remove_user_from_game(connection.username) 515 del self._connections[username] 516 if self.on_connection: 517 self.on_connection(username, False) 518 519 async def _handle_user(self, connection: UserConnection): 520 """Handle a logged in user connection - handle packets and return responses.""" 521 username = connection.username 522 while True: 523 # Wait for packet from user 524 packet = await connection.recv(timeout=3600.0) 525 # Important! We must set the packet's authenticated username. 526 packet.username = username 527 do_log = self.verbose_logging 528 if do_log: 529 logger.debug(packet) 530 if username in self._deleted_users: 531 response = Response("User deleted.", disconnecting=True) 532 else: 533 response: Response = self._handle_packet(packet) 534 if do_log: 535 logger.debug(f"--> {response}") 536 assert isinstance(response, Response) 537 # Also important, to set the game of the response for the client. 538 response.game = self._connections[username].game 539 await connection.send(response) 540 # The packet handler may have determined we are disconnecting 541 if response.disconnecting: 542 raise DisconnectedError(response.message) 543 544 def _handle_packet(self, packet: Packet) -> Response: 545 """Handle a packet from a logged in user.""" 546 # Find builtin handler 547 request_handler = self._request_handlers.get(packet.message) 548 if request_handler: 549 return request_handler(self, packet) 550 # Find game handler 551 game_name: Optional[str] = self._connections[packet.username].game 552 if game_name: 553 return self._handle_game_packet(packet, game_name) 554 # No handler found - not in game and not a builtin request 555 return Response( 556 "Please create/join a game.", 557 self._canned_response_payload | dict(packet=packet.debug_repr), 558 status=Status.UNEXPECTED, 559 ) 560 561 def _remove_user_from_game(self, username: str): 562 """Remove user from game and delete the game if expired.""" 563 connection = self._connections[username] 564 game = self._games.get(connection.game) 565 if not game: 566 return 567 connection.game = None 568 game.remove_user(username) 569 if game.expired: 570 del self._games[game.name] 571 logger.debug(f"User {username!r} removed from {game}") 572 573 def _delete_user(self, username: str): 574 assert username in self._users and username not in self._connections 575 del self._users[username] 576 if username in self._deleted_users: 577 self._deleted_users.remove(username) 578 logger.info(f"Deleted {username=}") 579 580 @_user_packet_handler() 581 def _handle_game_dir(self, packet: Packet) -> Response: 582 """Create a Response with dictionary of games details.""" 583 games_dict = {} 584 for name, game in self._games.items(): 585 games_dict[game.name] = dict( 586 name=game.name, 587 users=len(game.connected_users), 588 password_protected=game.password_protected, 589 info=game.get_lobby_info(), 590 ) 591 return Response("See payload for games directory.", dict(games=games_dict)) 592 593 def _create_game( 594 self, 595 name: str, 596 password: str = "", 597 game_data: Optional[str] = None, 598 ) -> LobbyGame: 599 """Create a new game.""" 600 assert name not in self._games 601 game = self._game_cls(name, save_string=game_data) 602 lobbygame = LobbyGame(game, name, password) 603 self._games[name] = lobbygame 604 logger.debug(f"Created game: {lobbygame}") 605 return lobbygame 606 607 def _destroy_game(self, game_name: str): 608 """Destroy an existing game.""" 609 game = self._games[game_name] 610 while game.connected_users: 611 self._remove_user_from_game(list(game.connected_users)[0]) 612 if game_name in self._games: 613 del self._games[game_name] 614 logger.debug(f"Destroyed game: {game_name!r}") 615 616 @_user_packet_handler() 617 def _handle_join_game( 618 self, 619 packet: Packet, 620 /, 621 *, 622 name: str = "", 623 ) -> Response: 624 """Handle a request to join a game.""" 625 game_name = name 626 connection = self._connections[packet.username] 627 current_name: Optional[str] = connection.game 628 if current_name: 629 return Response("Must leave game first.", status=Status.UNEXPECTED) 630 if not game_name: 631 return Response("Please specify a game name.", status=Status.UNEXPECTED) 632 if game_name == current_name: 633 return Response("Already in game.", status=Status.UNEXPECTED) 634 game = self._games.get(game_name) 635 if not game: 636 return self._handle_create_game(packet) 637 password = packet.payload.get("password", "") 638 fail = game.add_user(packet.username, password) 639 if fail: 640 return Response(f"Failed to join game: {fail}", status=Status.UNEXPECTED) 641 connection.game = game_name 642 logger.debug(f"User {packet.username!r} joined: {game}") 643 return Response( 644 f"Joined game: {game_name!r}.", 645 dict(heartbeat_rate=game.heartbeat_rate), 646 ) 647 648 @_user_packet_handler() 649 def _handle_leave_game(self, packet: Packet) -> Response: 650 """Handle a request to leave the game.""" 651 game_name: Optional[str] = self._connections[packet.username].game 652 if not game_name: 653 return Response("Not in game.", status=Status.UNEXPECTED) 654 self._remove_user_from_game(packet.username) 655 logger.debug(f"User {packet.username!r} left game: {game_name!r}") 656 return Response(f"Left game: {game_name!r}.") 657 658 @_user_packet_handler() 659 def _handle_create_game( 660 self, 661 packet: Packet, 662 /, 663 *, 664 name: str = "", 665 password: str = "", 666 ) -> Response: 667 """Handle request to create a new game specified in the payload.""" 668 game_name = name 669 connection = self._connections[packet.username] 670 current_game = connection.game 671 if current_game: 672 return Response("Must leave game first.", status=Status.UNEXPECTED) 673 if not is_gamename_allowed(game_name): 674 return Response("Game name not allowed.", status=Status.UNEXPECTED) 675 if game_name in self._games: 676 return Response("Game name already exists.", status=Status.UNEXPECTED) 677 game = self._create_game(game_name, password) 678 fail = game.add_user(packet.username, password) 679 assert not fail 680 connection.game = game_name 681 logger.debug(f"User {packet.username!r} created game: {game}") 682 return Response( 683 f"Created new game: {game_name!r}.", 684 dict(heartbeat_rate=game.heartbeat_rate), 685 ) 686 687 def _handle_game_packet(self, packet: Packet, game_name: str) -> Response: 688 """Routes a packet from a logged in user to the game's packet handler. 689 690 Will use the response's `disconnecting` attribute to remove the user 691 from the game, and then clear the attribute. 692 """ 693 game = self._games[game_name] 694 response: Response = game.handle_packet(packet) 695 assert isinstance(response, Response) 696 if response.disconnecting: 697 self._remove_user_from_game(packet.username) 698 response.disconnecting = False 699 return response 700 701 @_user_packet_handler() 702 def _handle_help(self, packet: Packet) -> Response: 703 requests = dict() 704 for name, f in self._request_handlers.items(): 705 requests[name] = { 706 name: param.annotation.__name__ 707 for name, param in _get_packet_handler_params(f).items() 708 } 709 return Response("See payload for requests.", requests) 710 711 # Admin commands 712 @_user_packet_handler(admin=True) 713 def _admin_shutdown(self, packet: Packet) -> Response: 714 """Shutdown the server.""" 715 self.shutdown() 716 return Response("Shutting down...") 717 718 @_user_packet_handler(admin=True) 719 def _admin_create_invite( 720 self, 721 packet: Packet, 722 /, 723 *, 724 username: str = "", 725 ) -> Response: 726 """Create an invite code. Can optionally by for a specific username.""" 727 code = os.urandom(2).hex() 728 self._invite_codes[code] = username 729 return Response(f"Created invite code: {code}") 730 731 @_user_packet_handler(admin=True) 732 def _admin_register( 733 self, 734 packet: Packet, 735 /, 736 *, 737 set_as: bool = False, 738 ) -> Response: 739 """Set user registration.""" 740 self.registration_enabled = set_as 741 return Response(f"Registration enabled: {set_as}") 742 743 @_user_packet_handler(admin=True) 744 def _admin_delete_user( 745 self, 746 packet: Packet, 747 /, 748 *, 749 username: str = "" 750 ) -> Response: 751 """Delete a user by name.""" 752 if username not in self._users: 753 return Response(f"No such username {username!r}") 754 self.delete_user(username) 755 return Response(f"Requested delete user {username!r}") 756 757 @_user_packet_handler(admin=True) 758 def _admin_destroy_game( 759 self, 760 packet: Packet, 761 /, 762 *, 763 name: str = "", 764 ) -> Response: 765 """Destroy a game by name.""" 766 game_name = name 767 if game_name not in self._games: 768 return Response(f"No such game: {game_name!r}", status=Status.UNEXPECTED) 769 self._destroy_game(game_name) 770 return Response(f"Destroyed game: {game_name!r}") 771 772 @_user_packet_handler(admin=True) 773 def _admin_save(self, packet: Packet) -> Response: 774 """Save all server data to file.""" 775 success = self._save_to_disk() 776 return Response(f"Saved {success=} server data to disk: {self._save_file}") 777 778 @_user_packet_handler(admin=True) 779 def _admin_verbose( 780 self, 781 packet: Packet, 782 /, 783 *, 784 set_as: bool = False, 785 ) -> Response: 786 """Set verbose logging.""" 787 self.verbose_logging = set_as 788 return Response(f"Verbose logging enabled: {set_as}") 789 790 @_user_packet_handler(admin=True) 791 def _admin_debug(self, packet: Packet) -> Response: 792 """Return debugging info.""" 793 games = [str(game) for name, game in sorted(self._games.items())] 794 connected_users = [str(conn) for u, conn in sorted(self._connections.items())] 795 all_users = sorted(self._users.keys()) 796 payload = dict( 797 packet=packet.debug_repr, 798 pubkey=self.pubkey, 799 games=games, 800 connected_users=connected_users, 801 all_users=all_users, 802 registration=self.registration_enabled, 803 invite_codes=self._invite_codes, 804 deleted_users=list(self._deleted_users), 805 verbose=self.verbose_logging, 806 ) 807 return Response("Debug", payload) 808 809 @_user_packet_handler(admin=True) 810 def _admin_sleep(self, packet: Packet, /, *, seconds: float = 1) -> Response: 811 """Simulate slow response by blocking for the time specified in payload. 812 813 Warning: this actually blocks the entire server. Time is capped at 5 seconds. 814 """ 815 max_sleep = 5 816 seconds = min(max_sleep, seconds) 817 time.sleep(seconds) 818 return Response(f"Slept for {seconds} seconds") 819 820 def _save_to_disk(self) -> bool: 821 """Save all data to disk.""" 822 if not self._save_file: 823 return False 824 game_data = [] 825 for game in self._games.values(): 826 save_string = game.get_save_string() 827 if not save_string: 828 continue 829 game_data.append(dict( 830 name=game.name, 831 password=game.password, 832 data=save_string, 833 )) 834 users = [ 835 dict(name=u.name, salt=u.salt, password=u.hashed_password) 836 for u in self._users.values() if u.name != ADMIN_USERNAME 837 ] 838 data = dict( 839 users=users, 840 games=game_data, 841 registration=self.registration_enabled, 842 invite_codes=self._invite_codes, 843 ) 844 dumped = json.dumps(data, indent=4) 845 self._save_file.parent.mkdir(parents=True, exist_ok=True) 846 with open(self._save_file, "w") as f: 847 f.write(dumped) 848 logger.debug( 849 f"Saved server data to {self._save_file}" 850 f" ({len(users)} users and {len(game_data)} games)" 851 ) 852 return True 853 854 def _load_from_disk(self): 855 if not self._save_file or not self._save_file.is_file(): 856 return 857 logger.info(f"Loading server data from {self._save_file}") 858 with open(self._save_file) as f: 859 data = f.read() 860 data = json.loads(data) 861 for user in data["users"]: 862 username = user["name"] 863 if username == ADMIN_USERNAME: 864 continue 865 if not is_username_allowed(username): 866 logger.warning(f"Loaded disallowed {username=}") 867 self._users[username] = u = User(username, user["salt"], user["password"]) 868 logger.debug(f"Loaded username: {u!r}") 869 for game in data["games"]: 870 game_name = game["name"] 871 if not is_gamename_allowed(game_name): 872 logger.warning(f"Loaded disallowed {game_name=}") 873 self._create_game(game_name, game["password"], game["data"]) 874 logger.debug(f"Loaded game: {self._games[game_name]!r}") 875 self._invite_codes |= data["invite_codes"] 876 self.registration_enabled = data["registration"] 877 logger.debug("Loading disk data complete.") 878 879 def __repr__(self): 880 """Object repr.""" 881 address = self._address or "public" 882 return ( 883 f"<{self.__class__.__qualname__}" 884 f" serving {address}:{self._port}" 885 f" @ {id(self):x}>" 886 ) 887 888 _request_handlers = { 889 Request.HELP: _handle_help, 890 Request.GAME_DIR: _handle_game_dir, 891 Request.CREATE_GAME: _handle_create_game, 892 Request.JOIN_GAME: _handle_join_game, 893 Request.LEAVE_GAME: _handle_leave_game, 894 Request.DEBUG: _admin_debug, 895 Request.SAVE: _admin_save, 896 Request.CREATE_INVITE: _admin_create_invite, 897 Request.DESTROY_GAME: _admin_destroy_game, 898 Request.DELETE_USER: _admin_delete_user, 899 Request.REGISTRATION: _admin_register, 900 Request.VERBOSE: _admin_verbose, 901 Request.SLEEP: _admin_sleep, 902 Request.SHUTDOWN: _admin_shutdown, 903 } 904 _canned_response_payload = dict(commands=list(_request_handlers.keys())) 905 906 907__all__ = ( 908 "Server", 909 "LobbyGame", 910 "User", 911 "UserConnection", 912 "is_username_allowed", 913 "is_gamename_allowed", 914)
251class Server: 252 """The server that hosts games. 253 254 Subclass from `pgnet.Game` and pass it as the *`game`* argument for the server. 255 Then, use the `Server.async_run` coroutine to start the server. 256 257 By default, the server is configured to listen on localhost. To listen 258 globally, set *`listen_globally`* and *`admin_password`*. 259 260 For games to save and load, *`save_file`* must be set (see also: 261 `pgnet.Game.get_save_string`). 262 263 .. note:: Most home networks require port forwarding to be discoverable by remote 264 clients. 265 """ 266 267 def __init__( 268 self, 269 game: Type[Game], 270 /, 271 *, 272 listen_globally: bool = False, 273 port: int = DEFAULT_PORT, 274 admin_password: Optional[str] = None, 275 registration_enabled: bool = True, 276 require_user_password: bool = False, 277 on_connection: Optional[Callable[[str, bool], Any]] = None, 278 verbose_logging: bool = False, 279 save_file: Optional[str | Path] = None, 280 ): 281 """Initialize the server. 282 283 Args: 284 listen_globally: Listen globally instead of localhost only. 285 Requires that *`admin_password`* must be set. 286 port: Port number to listen on. 287 admin_password: Password for admin user with elevated priviliges. 288 registration_enabled: Allow new users to register. 289 require_user_password: Require that users have non-empty passwords. 290 on_connection: Callback for when a username connects or disconnects. 291 verbose_logging: Log *all* packets and responses. 292 save_file: Location of file to save and load server sessions. 293 """ 294 if listen_globally and not admin_password: 295 logger.warning( 296 "Created server that listens globally without admin password." 297 ) 298 admin_password = admin_password or DEFAULT_ADMIN_PASSWORD 299 self._key: Key = Key() 300 self._stop: Optional[asyncio.Future] = None 301 self._require_user_password = require_user_password 302 self._users: dict[str, User] = dict() 303 self._register_user(ADMIN_USERNAME, admin_password) 304 self._games: dict[str, LobbyGame] = {} 305 self._connections: dict[str, Optional[UserConnection]] = {} 306 self._deleted_users: set[str] = set() 307 self._invite_codes: dict[str, str] = {} 308 self._game_cls: Type[Game] = game 309 self._save_file: Optional[Path] = None if save_file is None else Path(save_file) 310 self._address: str = "" if listen_globally else "localhost" 311 self._port: int = port 312 self.registration_enabled: bool = registration_enabled 313 self.on_connection: Optional[Callable[[str, bool], Any]] = on_connection 314 self.verbose_logging: bool = verbose_logging 315 self._load_from_disk() 316 logger.debug(f"{self._save_file=}") 317 logger.debug(f"{self._game_cls=}") 318 logger.debug(f"{self._key=}") 319 print(f"{admin_password=}") # Use print instead of logger for password 320 321 async def async_run(self, *, on_start: Optional[Callable] = None) -> int: 322 """Start the server. 323 324 The server will listen for connections and pass them off to the 325 connection handler. 326 327 Args: 328 on_start: Callback for when the server is online and handling messages. 329 330 Returns: 331 Exit code as given by the `shutdown` command. A value of -1 indicates a 332 request to reboot. 333 """ 334 if self._stop: 335 raise RuntimeError("Cannot run the server more than once concurrently.") 336 self._stop = asyncio.Future() 337 serving_args = (self._connection_handler, self._address, self._port) 338 try: 339 async with websockets.serve(*serving_args): 340 logger.info(f"Handling messages {self}") 341 if on_start: 342 on_start() 343 await self._listening_loop(self._stop) 344 except OSError as e: 345 added = OSError(f"Server fail. Perhaps one is already running? {self}") 346 raise added from e 347 self._save_to_disk() 348 result = self._stop.result() 349 logger.info(f"Server stop {result=} {self}") 350 self._stop = None 351 return result 352 353 def shutdown(self, result: int = 0, /): 354 """Stop the server. 355 356 The *result* is passed as the return value (exit code) for `Server.async_run`. 357 """ 358 if self._stop and not self._stop.done(): 359 self._stop.set_result(result) 360 361 def delete_user(self, username: str): 362 """Disconnect and delete a given username from the server.""" 363 if username == ADMIN_USERNAME: 364 logger.warning("Cannot delete admin.") 365 return 366 if username in self._connections: 367 self._deleted_users.add(username) 368 else: 369 self._delete_user(username) 370 371 @property 372 def pubkey(self) -> str: 373 """Public key used for end to end encryption.""" 374 return self._key.pubkey 375 376 async def _listening_loop(self, stop_future: asyncio.Future): 377 next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL) 378 next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL) 379 while not stop_future.done(): 380 await asyncio.sleep(0.1) 381 if arrow.now() >= next_autosave: 382 self._save_to_disk() 383 next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL) 384 if arrow.now() >= next_interval: 385 for game in self._games.values(): 386 game.update() 387 next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL) 388 389 async def _connection_handler(self, websocket: ServerWebSocket): 390 """Handle new connections. 391 392 Allows the handshake to fully populate a UserConnection, which may then 393 be handled as a logged in user. 394 """ 395 connection = UserConnection(websocket) 396 logger.info(f"New connection: {connection}") 397 try: 398 await self._handle_handshake(connection) 399 self._add_user_connection(connection) 400 logger.info(f"User logged in: {connection}") 401 await self._handle_user(connection) 402 except DisconnectedError as e: 403 logger.debug(f"{e=}") 404 finally: 405 logger.info(f"Closed connection: {connection}") 406 self._remove_user_connection(connection) 407 username = connection.username 408 if username in self._users and username in self._deleted_users: 409 self._delete_user(connection.username) 410 411 async def _handle_handshake(self, connection: UserConnection): 412 """Handle a new connection's handshake sequence. Modifies the connection object. 413 414 First trade public keys and assign the connection's `tunnel`. Then authenticate 415 and assign the connection's `username`. 416 """ 417 # Trade public keys 418 packet = await connection.recv() 419 pubkey = packet.payload.get("pubkey") 420 if not pubkey or not isinstance(pubkey, str): 421 response = Response( 422 "Missing public key string.", 423 status=Status.BAD, 424 disconnecting=True, 425 ) 426 await connection.send(response) 427 raise DisconnectedError("Incompatible protocol: missing pubkey.") 428 response = Response("key_trade", dict(pubkey=self.pubkey)) 429 await connection.send(response) 430 connection.tunnel = self._key.get_tunnel(pubkey) 431 logger.debug(f"Assigned tunnel: {connection}") 432 # Authenticate 433 packet = await connection.recv() 434 username = packet.payload.get("username") 435 password = packet.payload.get("password", "") 436 invite_code = packet.payload.get("invite_code", "") 437 fail = self._check_auth(username, password, invite_code) 438 if fail: 439 # Respond with problem and disconnect 440 response = Response(fail, status=Status.BAD, disconnecting=True) 441 await connection.send(response) 442 raise DisconnectedError("Failed to authenticate.") 443 connection.username = username 444 logger.debug(f"Assigned username: {connection}") 445 if username == ADMIN_USERNAME: 446 logger.warning(f"Authenticated as admin: {connection}") 447 await connection.send(Response("Authenticated.")) 448 449 def _check_auth( 450 self, 451 username: str, 452 password: str, 453 invite_code: str, 454 ) -> Optional[str]: 455 """Return failure reason or None.""" 456 if not username: 457 return "Missing non-empty username." 458 if username in self._deleted_users: 459 return "User deleted." 460 if username in self._connections: 461 return "Username already connected." 462 if username not in self._users: 463 return self._try_register_user(username, password, invite_code) 464 user = self._users[username] 465 if not user.compare_password(password): 466 return "Incorrect password." 467 return None 468 469 def _try_register_user( 470 self, 471 username: str, 472 password: str, 473 invite_code: str, 474 ) -> Optional[str]: 475 """Return failure reason or None.""" 476 if invite_code: 477 invite_username = self._invite_codes.get(invite_code) 478 wrong_code = invite_username is None 479 invite_valid = username == invite_username or invite_username == "" 480 if wrong_code or not invite_valid: 481 return "Incorrect username or invite code." 482 if not (self.registration_enabled or invite_code): 483 return "Registration blocked." 484 if self._require_user_password and not password: 485 return "User password required." 486 if not is_username_allowed(username): 487 return "Username not allowed." 488 self._register_user(username, password) 489 if invite_code: 490 del self._invite_codes[invite_code] 491 return None 492 493 def _register_user(self, username: str, password: str, /): 494 """Register new user.""" 495 assert username not in self._users 496 if self._require_user_password and not password: 497 raise ValueError("Server requires password for users.") 498 user = User.from_name_password(username, password) 499 self._users[username] = user 500 logger.info(f"Registered {username=}") 501 502 def _add_user_connection(self, connection: UserConnection): 503 """Add the connection to connected users table.""" 504 username = connection.username 505 assert username not in self._connections 506 self._connections[username] = connection 507 if self.on_connection: 508 self.on_connection(username, True) 509 510 def _remove_user_connection(self, connection: UserConnection): 511 """Remove the connection from connected users table if exists.""" 512 username = connection.username 513 if username not in self._connections: 514 return 515 self._remove_user_from_game(connection.username) 516 del self._connections[username] 517 if self.on_connection: 518 self.on_connection(username, False) 519 520 async def _handle_user(self, connection: UserConnection): 521 """Handle a logged in user connection - handle packets and return responses.""" 522 username = connection.username 523 while True: 524 # Wait for packet from user 525 packet = await connection.recv(timeout=3600.0) 526 # Important! We must set the packet's authenticated username. 527 packet.username = username 528 do_log = self.verbose_logging 529 if do_log: 530 logger.debug(packet) 531 if username in self._deleted_users: 532 response = Response("User deleted.", disconnecting=True) 533 else: 534 response: Response = self._handle_packet(packet) 535 if do_log: 536 logger.debug(f"--> {response}") 537 assert isinstance(response, Response) 538 # Also important, to set the game of the response for the client. 539 response.game = self._connections[username].game 540 await connection.send(response) 541 # The packet handler may have determined we are disconnecting 542 if response.disconnecting: 543 raise DisconnectedError(response.message) 544 545 def _handle_packet(self, packet: Packet) -> Response: 546 """Handle a packet from a logged in user.""" 547 # Find builtin handler 548 request_handler = self._request_handlers.get(packet.message) 549 if request_handler: 550 return request_handler(self, packet) 551 # Find game handler 552 game_name: Optional[str] = self._connections[packet.username].game 553 if game_name: 554 return self._handle_game_packet(packet, game_name) 555 # No handler found - not in game and not a builtin request 556 return Response( 557 "Please create/join a game.", 558 self._canned_response_payload | dict(packet=packet.debug_repr), 559 status=Status.UNEXPECTED, 560 ) 561 562 def _remove_user_from_game(self, username: str): 563 """Remove user from game and delete the game if expired.""" 564 connection = self._connections[username] 565 game = self._games.get(connection.game) 566 if not game: 567 return 568 connection.game = None 569 game.remove_user(username) 570 if game.expired: 571 del self._games[game.name] 572 logger.debug(f"User {username!r} removed from {game}") 573 574 def _delete_user(self, username: str): 575 assert username in self._users and username not in self._connections 576 del self._users[username] 577 if username in self._deleted_users: 578 self._deleted_users.remove(username) 579 logger.info(f"Deleted {username=}") 580 581 @_user_packet_handler() 582 def _handle_game_dir(self, packet: Packet) -> Response: 583 """Create a Response with dictionary of games details.""" 584 games_dict = {} 585 for name, game in self._games.items(): 586 games_dict[game.name] = dict( 587 name=game.name, 588 users=len(game.connected_users), 589 password_protected=game.password_protected, 590 info=game.get_lobby_info(), 591 ) 592 return Response("See payload for games directory.", dict(games=games_dict)) 593 594 def _create_game( 595 self, 596 name: str, 597 password: str = "", 598 game_data: Optional[str] = None, 599 ) -> LobbyGame: 600 """Create a new game.""" 601 assert name not in self._games 602 game = self._game_cls(name, save_string=game_data) 603 lobbygame = LobbyGame(game, name, password) 604 self._games[name] = lobbygame 605 logger.debug(f"Created game: {lobbygame}") 606 return lobbygame 607 608 def _destroy_game(self, game_name: str): 609 """Destroy an existing game.""" 610 game = self._games[game_name] 611 while game.connected_users: 612 self._remove_user_from_game(list(game.connected_users)[0]) 613 if game_name in self._games: 614 del self._games[game_name] 615 logger.debug(f"Destroyed game: {game_name!r}") 616 617 @_user_packet_handler() 618 def _handle_join_game( 619 self, 620 packet: Packet, 621 /, 622 *, 623 name: str = "", 624 ) -> Response: 625 """Handle a request to join a game.""" 626 game_name = name 627 connection = self._connections[packet.username] 628 current_name: Optional[str] = connection.game 629 if current_name: 630 return Response("Must leave game first.", status=Status.UNEXPECTED) 631 if not game_name: 632 return Response("Please specify a game name.", status=Status.UNEXPECTED) 633 if game_name == current_name: 634 return Response("Already in game.", status=Status.UNEXPECTED) 635 game = self._games.get(game_name) 636 if not game: 637 return self._handle_create_game(packet) 638 password = packet.payload.get("password", "") 639 fail = game.add_user(packet.username, password) 640 if fail: 641 return Response(f"Failed to join game: {fail}", status=Status.UNEXPECTED) 642 connection.game = game_name 643 logger.debug(f"User {packet.username!r} joined: {game}") 644 return Response( 645 f"Joined game: {game_name!r}.", 646 dict(heartbeat_rate=game.heartbeat_rate), 647 ) 648 649 @_user_packet_handler() 650 def _handle_leave_game(self, packet: Packet) -> Response: 651 """Handle a request to leave the game.""" 652 game_name: Optional[str] = self._connections[packet.username].game 653 if not game_name: 654 return Response("Not in game.", status=Status.UNEXPECTED) 655 self._remove_user_from_game(packet.username) 656 logger.debug(f"User {packet.username!r} left game: {game_name!r}") 657 return Response(f"Left game: {game_name!r}.") 658 659 @_user_packet_handler() 660 def _handle_create_game( 661 self, 662 packet: Packet, 663 /, 664 *, 665 name: str = "", 666 password: str = "", 667 ) -> Response: 668 """Handle request to create a new game specified in the payload.""" 669 game_name = name 670 connection = self._connections[packet.username] 671 current_game = connection.game 672 if current_game: 673 return Response("Must leave game first.", status=Status.UNEXPECTED) 674 if not is_gamename_allowed(game_name): 675 return Response("Game name not allowed.", status=Status.UNEXPECTED) 676 if game_name in self._games: 677 return Response("Game name already exists.", status=Status.UNEXPECTED) 678 game = self._create_game(game_name, password) 679 fail = game.add_user(packet.username, password) 680 assert not fail 681 connection.game = game_name 682 logger.debug(f"User {packet.username!r} created game: {game}") 683 return Response( 684 f"Created new game: {game_name!r}.", 685 dict(heartbeat_rate=game.heartbeat_rate), 686 ) 687 688 def _handle_game_packet(self, packet: Packet, game_name: str) -> Response: 689 """Routes a packet from a logged in user to the game's packet handler. 690 691 Will use the response's `disconnecting` attribute to remove the user 692 from the game, and then clear the attribute. 693 """ 694 game = self._games[game_name] 695 response: Response = game.handle_packet(packet) 696 assert isinstance(response, Response) 697 if response.disconnecting: 698 self._remove_user_from_game(packet.username) 699 response.disconnecting = False 700 return response 701 702 @_user_packet_handler() 703 def _handle_help(self, packet: Packet) -> Response: 704 requests = dict() 705 for name, f in self._request_handlers.items(): 706 requests[name] = { 707 name: param.annotation.__name__ 708 for name, param in _get_packet_handler_params(f).items() 709 } 710 return Response("See payload for requests.", requests) 711 712 # Admin commands 713 @_user_packet_handler(admin=True) 714 def _admin_shutdown(self, packet: Packet) -> Response: 715 """Shutdown the server.""" 716 self.shutdown() 717 return Response("Shutting down...") 718 719 @_user_packet_handler(admin=True) 720 def _admin_create_invite( 721 self, 722 packet: Packet, 723 /, 724 *, 725 username: str = "", 726 ) -> Response: 727 """Create an invite code. Can optionally by for a specific username.""" 728 code = os.urandom(2).hex() 729 self._invite_codes[code] = username 730 return Response(f"Created invite code: {code}") 731 732 @_user_packet_handler(admin=True) 733 def _admin_register( 734 self, 735 packet: Packet, 736 /, 737 *, 738 set_as: bool = False, 739 ) -> Response: 740 """Set user registration.""" 741 self.registration_enabled = set_as 742 return Response(f"Registration enabled: {set_as}") 743 744 @_user_packet_handler(admin=True) 745 def _admin_delete_user( 746 self, 747 packet: Packet, 748 /, 749 *, 750 username: str = "" 751 ) -> Response: 752 """Delete a user by name.""" 753 if username not in self._users: 754 return Response(f"No such username {username!r}") 755 self.delete_user(username) 756 return Response(f"Requested delete user {username!r}") 757 758 @_user_packet_handler(admin=True) 759 def _admin_destroy_game( 760 self, 761 packet: Packet, 762 /, 763 *, 764 name: str = "", 765 ) -> Response: 766 """Destroy a game by name.""" 767 game_name = name 768 if game_name not in self._games: 769 return Response(f"No such game: {game_name!r}", status=Status.UNEXPECTED) 770 self._destroy_game(game_name) 771 return Response(f"Destroyed game: {game_name!r}") 772 773 @_user_packet_handler(admin=True) 774 def _admin_save(self, packet: Packet) -> Response: 775 """Save all server data to file.""" 776 success = self._save_to_disk() 777 return Response(f"Saved {success=} server data to disk: {self._save_file}") 778 779 @_user_packet_handler(admin=True) 780 def _admin_verbose( 781 self, 782 packet: Packet, 783 /, 784 *, 785 set_as: bool = False, 786 ) -> Response: 787 """Set verbose logging.""" 788 self.verbose_logging = set_as 789 return Response(f"Verbose logging enabled: {set_as}") 790 791 @_user_packet_handler(admin=True) 792 def _admin_debug(self, packet: Packet) -> Response: 793 """Return debugging info.""" 794 games = [str(game) for name, game in sorted(self._games.items())] 795 connected_users = [str(conn) for u, conn in sorted(self._connections.items())] 796 all_users = sorted(self._users.keys()) 797 payload = dict( 798 packet=packet.debug_repr, 799 pubkey=self.pubkey, 800 games=games, 801 connected_users=connected_users, 802 all_users=all_users, 803 registration=self.registration_enabled, 804 invite_codes=self._invite_codes, 805 deleted_users=list(self._deleted_users), 806 verbose=self.verbose_logging, 807 ) 808 return Response("Debug", payload) 809 810 @_user_packet_handler(admin=True) 811 def _admin_sleep(self, packet: Packet, /, *, seconds: float = 1) -> Response: 812 """Simulate slow response by blocking for the time specified in payload. 813 814 Warning: this actually blocks the entire server. Time is capped at 5 seconds. 815 """ 816 max_sleep = 5 817 seconds = min(max_sleep, seconds) 818 time.sleep(seconds) 819 return Response(f"Slept for {seconds} seconds") 820 821 def _save_to_disk(self) -> bool: 822 """Save all data to disk.""" 823 if not self._save_file: 824 return False 825 game_data = [] 826 for game in self._games.values(): 827 save_string = game.get_save_string() 828 if not save_string: 829 continue 830 game_data.append(dict( 831 name=game.name, 832 password=game.password, 833 data=save_string, 834 )) 835 users = [ 836 dict(name=u.name, salt=u.salt, password=u.hashed_password) 837 for u in self._users.values() if u.name != ADMIN_USERNAME 838 ] 839 data = dict( 840 users=users, 841 games=game_data, 842 registration=self.registration_enabled, 843 invite_codes=self._invite_codes, 844 ) 845 dumped = json.dumps(data, indent=4) 846 self._save_file.parent.mkdir(parents=True, exist_ok=True) 847 with open(self._save_file, "w") as f: 848 f.write(dumped) 849 logger.debug( 850 f"Saved server data to {self._save_file}" 851 f" ({len(users)} users and {len(game_data)} games)" 852 ) 853 return True 854 855 def _load_from_disk(self): 856 if not self._save_file or not self._save_file.is_file(): 857 return 858 logger.info(f"Loading server data from {self._save_file}") 859 with open(self._save_file) as f: 860 data = f.read() 861 data = json.loads(data) 862 for user in data["users"]: 863 username = user["name"] 864 if username == ADMIN_USERNAME: 865 continue 866 if not is_username_allowed(username): 867 logger.warning(f"Loaded disallowed {username=}") 868 self._users[username] = u = User(username, user["salt"], user["password"]) 869 logger.debug(f"Loaded username: {u!r}") 870 for game in data["games"]: 871 game_name = game["name"] 872 if not is_gamename_allowed(game_name): 873 logger.warning(f"Loaded disallowed {game_name=}") 874 self._create_game(game_name, game["password"], game["data"]) 875 logger.debug(f"Loaded game: {self._games[game_name]!r}") 876 self._invite_codes |= data["invite_codes"] 877 self.registration_enabled = data["registration"] 878 logger.debug("Loading disk data complete.") 879 880 def __repr__(self): 881 """Object repr.""" 882 address = self._address or "public" 883 return ( 884 f"<{self.__class__.__qualname__}" 885 f" serving {address}:{self._port}" 886 f" @ {id(self):x}>" 887 ) 888 889 _request_handlers = { 890 Request.HELP: _handle_help, 891 Request.GAME_DIR: _handle_game_dir, 892 Request.CREATE_GAME: _handle_create_game, 893 Request.JOIN_GAME: _handle_join_game, 894 Request.LEAVE_GAME: _handle_leave_game, 895 Request.DEBUG: _admin_debug, 896 Request.SAVE: _admin_save, 897 Request.CREATE_INVITE: _admin_create_invite, 898 Request.DESTROY_GAME: _admin_destroy_game, 899 Request.DELETE_USER: _admin_delete_user, 900 Request.REGISTRATION: _admin_register, 901 Request.VERBOSE: _admin_verbose, 902 Request.SLEEP: _admin_sleep, 903 Request.SHUTDOWN: _admin_shutdown, 904 } 905 _canned_response_payload = dict(commands=list(_request_handlers.keys()))
The server that hosts games.
Subclass from pgnet.Game
and pass it as the game
argument for the server.
Then, use the Server.async_run
coroutine to start the server.
By default, the server is configured to listen on localhost. To listen
globally, set listen_globally
and admin_password
.
For games to save and load, save_file
must be set (see also:
pgnet.Game.get_save_string
).
Most home networks require port forwarding to be discoverable by remote
clients.
267 def __init__( 268 self, 269 game: Type[Game], 270 /, 271 *, 272 listen_globally: bool = False, 273 port: int = DEFAULT_PORT, 274 admin_password: Optional[str] = None, 275 registration_enabled: bool = True, 276 require_user_password: bool = False, 277 on_connection: Optional[Callable[[str, bool], Any]] = None, 278 verbose_logging: bool = False, 279 save_file: Optional[str | Path] = None, 280 ): 281 """Initialize the server. 282 283 Args: 284 listen_globally: Listen globally instead of localhost only. 285 Requires that *`admin_password`* must be set. 286 port: Port number to listen on. 287 admin_password: Password for admin user with elevated priviliges. 288 registration_enabled: Allow new users to register. 289 require_user_password: Require that users have non-empty passwords. 290 on_connection: Callback for when a username connects or disconnects. 291 verbose_logging: Log *all* packets and responses. 292 save_file: Location of file to save and load server sessions. 293 """ 294 if listen_globally and not admin_password: 295 logger.warning( 296 "Created server that listens globally without admin password." 297 ) 298 admin_password = admin_password or DEFAULT_ADMIN_PASSWORD 299 self._key: Key = Key() 300 self._stop: Optional[asyncio.Future] = None 301 self._require_user_password = require_user_password 302 self._users: dict[str, User] = dict() 303 self._register_user(ADMIN_USERNAME, admin_password) 304 self._games: dict[str, LobbyGame] = {} 305 self._connections: dict[str, Optional[UserConnection]] = {} 306 self._deleted_users: set[str] = set() 307 self._invite_codes: dict[str, str] = {} 308 self._game_cls: Type[Game] = game 309 self._save_file: Optional[Path] = None if save_file is None else Path(save_file) 310 self._address: str = "" if listen_globally else "localhost" 311 self._port: int = port 312 self.registration_enabled: bool = registration_enabled 313 self.on_connection: Optional[Callable[[str, bool], Any]] = on_connection 314 self.verbose_logging: bool = verbose_logging 315 self._load_from_disk() 316 logger.debug(f"{self._save_file=}") 317 logger.debug(f"{self._game_cls=}") 318 logger.debug(f"{self._key=}") 319 print(f"{admin_password=}") # Use print instead of logger for password
Initialize the server.
Arguments:
- listen_globally: Listen globally instead of localhost only.
Requires that
admin_password
must be set. - port: Port number to listen on.
- admin_password: Password for admin user with elevated priviliges.
- registration_enabled: Allow new users to register.
- require_user_password: Require that users have non-empty passwords.
- on_connection: Callback for when a username connects or disconnects.
- verbose_logging: Log all packets and responses.
- save_file: Location of file to save and load server sessions.
321 async def async_run(self, *, on_start: Optional[Callable] = None) -> int: 322 """Start the server. 323 324 The server will listen for connections and pass them off to the 325 connection handler. 326 327 Args: 328 on_start: Callback for when the server is online and handling messages. 329 330 Returns: 331 Exit code as given by the `shutdown` command. A value of -1 indicates a 332 request to reboot. 333 """ 334 if self._stop: 335 raise RuntimeError("Cannot run the server more than once concurrently.") 336 self._stop = asyncio.Future() 337 serving_args = (self._connection_handler, self._address, self._port) 338 try: 339 async with websockets.serve(*serving_args): 340 logger.info(f"Handling messages {self}") 341 if on_start: 342 on_start() 343 await self._listening_loop(self._stop) 344 except OSError as e: 345 added = OSError(f"Server fail. Perhaps one is already running? {self}") 346 raise added from e 347 self._save_to_disk() 348 result = self._stop.result() 349 logger.info(f"Server stop {result=} {self}") 350 self._stop = None 351 return result
Start the server.
The server will listen for connections and pass them off to the connection handler.
Arguments:
- on_start: Callback for when the server is online and handling messages.
Returns:
Exit code as given by the
shutdown
command. A value of -1 indicates a request to reboot.
353 def shutdown(self, result: int = 0, /): 354 """Stop the server. 355 356 The *result* is passed as the return value (exit code) for `Server.async_run`. 357 """ 358 if self._stop and not self._stop.done(): 359 self._stop.set_result(result)
Stop the server.
The result is passed as the return value (exit code) for Server.async_run
.
361 def delete_user(self, username: str): 362 """Disconnect and delete a given username from the server.""" 363 if username == ADMIN_USERNAME: 364 logger.warning("Cannot delete admin.") 365 return 366 if username in self._connections: 367 self._deleted_users.add(username) 368 else: 369 self._delete_user(username)
Disconnect and delete a given username from the server.
193@dataclass 194class LobbyGame: 195 """`pgnet.Game` instance wrapper for management by server.""" 196 197 game: Game = field(repr=False) 198 name: str 199 password: str 200 connected_users: set[str] = field(default_factory=set) 201 202 @property 203 def heartbeat_rate(self) -> float: 204 """Updates per second.""" 205 return self.game.heartbeat_rate 206 207 @property 208 def password_protected(self) -> bool: 209 """If the game is password protected.""" 210 return bool(self.password) 211 212 def add_user(self, username: str, password: str) -> Optional[str]: 213 """Return reason if user was not successfully added.""" 214 if password != self.password: 215 return "Incorrect password." 216 self.connected_users.add(username) 217 self.game.user_joined(username) 218 return None 219 220 def remove_user(self, username): 221 """Remove user from game.""" 222 if username in self.connected_users: 223 self.connected_users.remove(username) 224 self.game.user_left(username) 225 226 def get_save_string(self) -> Optional[str]: 227 """Called by the server when shutting down.""" 228 if self.game.persistent: 229 return self.game.get_save_string() 230 return None 231 232 def get_lobby_info(self) -> str: 233 """Called by the server to get game info.""" 234 return self.game.get_lobby_info() 235 236 @property 237 def expired(self) -> bool: 238 """If the game is empty and not persistent.""" 239 return not self.connected_users and not self.game.persistent 240 241 def handle_packet(self, packet: Packet) -> Response: 242 """Relay packet handling to game instance.""" 243 response = self.game.handle_packet(packet) 244 return response 245 246 def update(self): 247 """Called on an interval by the server.""" 248 self.game.update()
pgnet.Game
instance wrapper for management by server.
212 def add_user(self, username: str, password: str) -> Optional[str]: 213 """Return reason if user was not successfully added.""" 214 if password != self.password: 215 return "Incorrect password." 216 self.connected_users.add(username) 217 self.game.user_joined(username) 218 return None
Return reason if user was not successfully added.
220 def remove_user(self, username): 221 """Remove user from game.""" 222 if username in self.connected_users: 223 self.connected_users.remove(username) 224 self.game.user_left(username)
Remove user from game.
226 def get_save_string(self) -> Optional[str]: 227 """Called by the server when shutting down.""" 228 if self.game.persistent: 229 return self.game.get_save_string() 230 return None
Called by the server when shutting down.
232 def get_lobby_info(self) -> str: 233 """Called by the server to get game info.""" 234 return self.game.get_lobby_info()
Called by the server to get game info.
144@dataclass 145class User: 146 """User authentication info.""" 147 148 name: str 149 salt: str 150 hashed_password: str 151 152 @classmethod 153 def from_name_password(cls, name: str, password: str): 154 """Create a new user from a raw (unsalted/unhashed) password.""" 155 salt = cls._generate_salt() 156 hashed_password = cls._hash_password(password, salt) 157 return cls(name, salt=salt, hashed_password=hashed_password) 158 159 def compare_password(self, password: str): 160 """Compare a raw (unsalted/unhashed) password to our password.""" 161 return self._hash_password(password, self.salt) == self.hashed_password 162 163 @staticmethod 164 def _generate_salt() -> str: 165 return os.urandom(SALT_SIZE).hex() 166 167 @staticmethod 168 def _hash_password(password: str, salt: str) -> str: 169 """Hash a string using Python's hashlib.""" 170 return hashlib.sha256(f"{salt}{password}".encode()).hexdigest()
User authentication info.
152 @classmethod 153 def from_name_password(cls, name: str, password: str): 154 """Create a new user from a raw (unsalted/unhashed) password.""" 155 salt = cls._generate_salt() 156 hashed_password = cls._hash_password(password, salt) 157 return cls(name, salt=salt, hashed_password=hashed_password)
Create a new user from a raw (unsalted/unhashed) password.
173@dataclass 174class UserConnection(Connection): 175 """Thin wrapper for `pgnet.util.Connection`. 176 177 Provides serialization and deserialization. 178 """ 179 180 username: Optional[str] = None 181 game: Optional[str] = None 182 183 async def send(self, response: Response, *args, **kwargs): 184 """Override base method to serialize response.""" 185 await super().send(response.serialize(), *args, **kwargs) 186 187 async def recv(self, *args, **kwargs) -> Packet: 188 """Override base method to deserialize packet.""" 189 message = await super().recv(*args, **kwargs) 190 return Packet.deserialize(message)
Thin wrapper for pgnet.util.Connection
.
Provides serialization and deserialization.
183 async def send(self, response: Response, *args, **kwargs): 184 """Override base method to serialize response.""" 185 await super().send(response.serialize(), *args, **kwargs)
Override base method to serialize response.
187 async def recv(self, *args, **kwargs) -> Packet: 188 """Override base method to deserialize packet.""" 189 message = await super().recv(*args, **kwargs) 190 return Packet.deserialize(message)
Override base method to deserialize packet.
Inherited Members
124def is_username_allowed(name: str, /) -> bool: 125 """If a username is allowed.""" 126 return _check_name( 127 name, 128 max_len=20, 129 allow_whitespace=False, 130 alnum_only=True, 131 )
If a username is allowed.
134def is_gamename_allowed(name: str, /) -> bool: 135 """If a game name is allowed.""" 136 return _check_name( 137 name, 138 max_len=50, 139 allow_whitespace=True, 140 alnum_only=True, 141 )
If a game name is allowed.