[MouseFox logo]
The MouseFox Project
Join The Community Discord Server
[Discord logo]
Edit on GitHub

pgnet.server

Home of the Server class.

  1"""Home of the `Server` class."""
  2
  3from loguru import logger
  4from typing import Optional, Callable, Type, Any
  5import arrow
  6import asyncio
  7import functools
  8import json
  9from pathlib import Path
 10import inspect
 11import re
 12import os
 13import time
 14import websockets
 15from dataclasses import dataclass, field
 16from websockets.server import WebSocketServerProtocol as ServerWebSocket
 17import hashlib
 18from .util import (
 19    Packet,
 20    Response,
 21    DisconnectedError,
 22    Key,
 23    Connection,
 24    Game,
 25    DEFAULT_PORT,
 26    Request,
 27    DEFAULT_ADMIN_PASSWORD,
 28    ADMIN_USERNAME,
 29    Status,
 30)
 31
 32
 33GAME_UPDATE_INTERVAL = 0.1
 34AUTOSAVE_INTERVAL = 300  # 5 minutes
 35SALT_SIZE = 20
 36
 37_re_whitespace = re.compile(r"\W")
 38_re_non_alnum = re.compile(r"[^a-zA-Z\d\s]")
 39_re_start_whitespace = re.compile(r"^\W")
 40_re_end_whitespace = re.compile(r"\W$")
 41
 42
 43def _get_packet_handler_params(f: Callable) -> set[str]:
 44    return {
 45        name: param
 46        for name, param in inspect.signature(f).parameters.items()
 47        if name not in {"self", "packet"}
 48    }
 49
 50
 51def _user_packet_handler(*, admin: bool = False):
 52    """Decorator that unpacks a packet payload into keyword arguments.
 53
 54    Checks that payload keys exist in arguments and values match the annotations. If
 55    *admin* is True, will check that the packet is from the admin user.
 56    """
 57    def wrapper(f: Callable):
 58        params = _get_packet_handler_params(f)
 59        for name, param in params.items():
 60            if param.annotation not in {int, str, bool, float}:
 61                raise AssertionError(
 62                    f"{name!r} of {f} must be of JSON-able type,"
 63                    f" instead got: {param.annotation}"
 64                )
 65
 66        @functools.wraps(f)
 67        def inner(server: "Server", packet: Packet):
 68            # Check admin
 69            if admin and packet.username != ADMIN_USERNAME:
 70                return Response(
 71                    f"{packet.message!r} requires admin privileges.",
 72                    status=Status.UNEXPECTED,
 73                )
 74            # Compare payload to signature
 75            for arg, value in packet.payload.items():
 76                if arg not in params:
 77                    return Response(
 78                        f"Unexpected argument {arg!r} for request {packet.message!r}",
 79                        status=Status.UNEXPECTED,
 80                    )
 81                expected_type = params[arg].annotation
 82                if type(value) is not expected_type:
 83                    m = (
 84                        f"Expected argument type {expected_type} for argument {arg!r}."
 85                        f" Instead got: {type(value)} {value!r}"
 86                    )
 87                    return Response(m, status=Status.UNEXPECTED)
 88            # Finally call wrapped function
 89            return f(server, packet, **packet.payload)
 90        return inner
 91    return wrapper
 92
 93
 94def _check_name(
 95    name: str,
 96    /,
 97    *,
 98    min_len: int = 3,
 99    max_len: int = 20,
100    allow_whitespace: bool = False,
101    alnum_only: bool = True,
102    allow_lead_trail_whitespace: bool = False,
103) -> bool:
104    """If a string matches criteria of arguments."""
105    if ADMIN_USERNAME.lower() in name.lower():
106        return False
107    if len(name) > max_len:
108        return False
109    if len(name) < min_len:
110        return False
111    if not allow_whitespace and bool(_re_whitespace.search(name)):
112        return False
113    if alnum_only and bool(_re_non_alnum.search(name)):
114        return False
115    if not allow_lead_trail_whitespace:
116        if bool(_re_start_whitespace.search(name)):
117            return False
118        if bool(_re_end_whitespace.search(name)):
119            return False
120    return True
121
122
123def is_username_allowed(name: str, /) -> bool:
124    """If a username is allowed."""
125    return _check_name(
126        name,
127        max_len=20,
128        allow_whitespace=False,
129        alnum_only=True,
130    )
131
132
133def is_gamename_allowed(name: str, /) -> bool:
134    """If a game name is allowed."""
135    return _check_name(
136        name,
137        max_len=50,
138        allow_whitespace=True,
139        alnum_only=True,
140    )
141
142
143@dataclass
144class User:
145    """User authentication info."""
146
147    name: str
148    salt: str
149    hashed_password: str
150
151    @classmethod
152    def from_name_password(cls, name: str, password: str):
153        """Create a new user from a raw (unsalted/unhashed) password."""
154        salt = cls._generate_salt()
155        hashed_password = cls._hash_password(password, salt)
156        return cls(name, salt=salt, hashed_password=hashed_password)
157
158    def compare_password(self, password: str):
159        """Compare a raw (unsalted/unhashed) password to our password."""
160        return self._hash_password(password, self.salt) == self.hashed_password
161
162    @staticmethod
163    def _generate_salt() -> str:
164        return os.urandom(SALT_SIZE).hex()
165
166    @staticmethod
167    def _hash_password(password: str, salt: str) -> str:
168        """Hash a string using Python's hashlib."""
169        return hashlib.sha256(f"{salt}{password}".encode()).hexdigest()
170
171
172@dataclass
173class UserConnection(Connection):
174    """Thin wrapper for `pgnet.util.Connection`.
175
176    Provides serialization and deserialization.
177    """
178
179    username: Optional[str] = None
180    game: Optional[str] = None
181
182    async def send(self, response: Response, *args, **kwargs):
183        """Override base method to serialize response."""
184        await super().send(response.serialize(), *args, **kwargs)
185
186    async def recv(self, *args, **kwargs) -> Packet:
187        """Override base method to deserialize packet."""
188        message = await super().recv(*args, **kwargs)
189        return Packet.deserialize(message)
190
191
192@dataclass
193class LobbyGame:
194    """`pgnet.Game` instance wrapper for management by server."""
195
196    game: Game = field(repr=False)
197    name: str
198    password: str
199    connected_users: set[str] = field(default_factory=set)
200
201    @property
202    def heartbeat_rate(self) -> float:
203        """Updates per second."""
204        return self.game.heartbeat_rate
205
206    @property
207    def password_protected(self) -> bool:
208        """If the game is password protected."""
209        return bool(self.password)
210
211    def add_user(self, username: str, password: str) -> Optional[str]:
212        """Return reason if user was not successfully added."""
213        if password != self.password:
214            return "Incorrect password."
215        self.connected_users.add(username)
216        self.game.user_joined(username)
217        return None
218
219    def remove_user(self, username):
220        """Remove user from game."""
221        if username in self.connected_users:
222            self.connected_users.remove(username)
223            self.game.user_left(username)
224
225    def get_save_string(self) -> Optional[str]:
226        """Called by the server when shutting down."""
227        if self.game.persistent:
228            return self.game.get_save_string()
229        return None
230
231    def get_lobby_info(self) -> str:
232        """Called by the server to get game info."""
233        return self.game.get_lobby_info()
234
235    @property
236    def expired(self) -> bool:
237        """If the game is empty and not persistent."""
238        return not self.connected_users and not self.game.persistent
239
240    def handle_packet(self, packet: Packet) -> Response:
241        """Relay packet handling to game instance."""
242        response = self.game.handle_packet(packet)
243        return response
244
245    def update(self):
246        """Called on an interval by the server."""
247        self.game.update()
248
249
250class Server:
251    """The server that hosts games.
252
253    Subclass from `pgnet.Game` and pass it as the *`game`* argument for the server.
254    Then, use the `Server.async_run` coroutine to start the server.
255
256    By default, the server is configured to listen on localhost. To listen
257    globally, set *`listen_globally`* and *`admin_password`*.
258
259    For games to save and load, *`save_file`* must be set (see also:
260    `pgnet.Game.get_save_string`).
261
262    .. note:: Most home networks require port forwarding to be discoverable by remote
263        clients.
264    """
265
266    def __init__(
267        self,
268        game: Type[Game],
269        /,
270        *,
271        listen_globally: bool = False,
272        port: int = DEFAULT_PORT,
273        admin_password: Optional[str] = None,
274        registration_enabled: bool = True,
275        require_user_password: bool = False,
276        on_connection: Optional[Callable[[str, bool], Any]] = None,
277        verbose_logging: bool = False,
278        save_file: Optional[str | Path] = None,
279    ):
280        """Initialize the server.
281
282        Args:
283            listen_globally: Listen globally instead of localhost only.
284                Requires that *`admin_password`* must be set.
285            port: Port number to listen on.
286            admin_password: Password for admin user with elevated priviliges.
287            registration_enabled: Allow new users to register.
288            require_user_password: Require that users have non-empty passwords.
289            on_connection: Callback for when a username connects or disconnects.
290            verbose_logging: Log *all* packets and responses.
291            save_file: Location of file to save and load server sessions.
292        """
293        if listen_globally and not admin_password:
294            logger.warning(
295                "Created server that listens globally without admin password."
296            )
297        admin_password = admin_password or DEFAULT_ADMIN_PASSWORD
298        self._key: Key = Key()
299        self._stop: Optional[asyncio.Future] = None
300        self._require_user_password = require_user_password
301        self._users: dict[str, User] = dict()
302        self._register_user(ADMIN_USERNAME, admin_password)
303        self._games: dict[str, LobbyGame] = {}
304        self._connections: dict[str, Optional[UserConnection]] = {}
305        self._deleted_users: set[str] = set()
306        self._invite_codes: dict[str, str] = {}
307        self._game_cls: Type[Game] = game
308        self._save_file: Optional[Path] = None if save_file is None else Path(save_file)
309        self._address: str = "" if listen_globally else "localhost"
310        self._port: int = port
311        self.registration_enabled: bool = registration_enabled
312        self.on_connection: Optional[Callable[[str, bool], Any]] = on_connection
313        self.verbose_logging: bool = verbose_logging
314        self._load_from_disk()
315        logger.debug(f"{self._save_file=}")
316        logger.debug(f"{self._game_cls=}")
317        logger.debug(f"{self._key=}")
318        print(f"{admin_password=}")  # Use print instead of logger for password
319
320    async def async_run(self, *, on_start: Optional[Callable] = None) -> int:
321        """Start the server.
322
323        The server will listen for connections and pass them off to the
324        connection handler.
325
326        Args:
327            on_start: Callback for when the server is online and handling messages.
328
329        Returns:
330            Exit code as given by the `shutdown` command. A value of -1 indicates a
331                request to reboot.
332        """
333        if self._stop:
334            raise RuntimeError("Cannot run the server more than once concurrently.")
335        self._stop = asyncio.Future()
336        serving_args = (self._connection_handler, self._address, self._port)
337        try:
338            async with websockets.serve(*serving_args):
339                logger.info(f"Handling messages {self}")
340                if on_start:
341                    on_start()
342                await self._listening_loop(self._stop)
343        except OSError as e:
344            added = OSError(f"Server fail. Perhaps one is already running? {self}")
345            raise added from e
346        self._save_to_disk()
347        result = self._stop.result()
348        logger.info(f"Server stop {result=} {self}")
349        self._stop = None
350        return result
351
352    def shutdown(self, result: int = 0, /):
353        """Stop the server.
354
355        The *result* is passed as the return value (exit code) for `Server.async_run`.
356        """
357        if self._stop and not self._stop.done():
358            self._stop.set_result(result)
359
360    def delete_user(self, username: str):
361        """Disconnect and delete a given username from the server."""
362        if username == ADMIN_USERNAME:
363            logger.warning("Cannot delete admin.")
364            return
365        if username in self._connections:
366            self._deleted_users.add(username)
367        else:
368            self._delete_user(username)
369
370    @property
371    def pubkey(self) -> str:
372        """Public key used for end to end encryption."""
373        return self._key.pubkey
374
375    async def _listening_loop(self, stop_future: asyncio.Future):
376        next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL)
377        next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL)
378        while not stop_future.done():
379            await asyncio.sleep(0.1)
380            if arrow.now() >= next_autosave:
381                self._save_to_disk()
382                next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL)
383            if arrow.now() >= next_interval:
384                for game in self._games.values():
385                    game.update()
386                next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL)
387
388    async def _connection_handler(self, websocket: ServerWebSocket):
389        """Handle new connections.
390
391        Allows the handshake to fully populate a UserConnection, which may then
392        be handled as a logged in user.
393        """
394        connection = UserConnection(websocket)
395        logger.info(f"New connection: {connection}")
396        try:
397            await self._handle_handshake(connection)
398            self._add_user_connection(connection)
399            logger.info(f"User logged in: {connection}")
400            await self._handle_user(connection)
401        except DisconnectedError as e:
402            logger.debug(f"{e=}")
403        finally:
404            logger.info(f"Closed connection: {connection}")
405            self._remove_user_connection(connection)
406            username = connection.username
407            if username in self._users and username in self._deleted_users:
408                self._delete_user(connection.username)
409
410    async def _handle_handshake(self, connection: UserConnection):
411        """Handle a new connection's handshake sequence. Modifies the connection object.
412
413        First trade public keys and assign the connection's `tunnel`. Then authenticate
414        and assign the connection's `username`.
415        """
416        # Trade public keys
417        packet = await connection.recv()
418        pubkey = packet.payload.get("pubkey")
419        if not pubkey or not isinstance(pubkey, str):
420            response = Response(
421                "Missing public key string.",
422                status=Status.BAD,
423                disconnecting=True,
424            )
425            await connection.send(response)
426            raise DisconnectedError("Incompatible protocol: missing pubkey.")
427        response = Response("key_trade", dict(pubkey=self.pubkey))
428        await connection.send(response)
429        connection.tunnel = self._key.get_tunnel(pubkey)
430        logger.debug(f"Assigned tunnel: {connection}")
431        # Authenticate
432        packet = await connection.recv()
433        username = packet.payload.get("username")
434        password = packet.payload.get("password", "")
435        invite_code = packet.payload.get("invite_code", "")
436        fail = self._check_auth(username, password, invite_code)
437        if fail:
438            # Respond with problem and disconnect
439            response = Response(fail, status=Status.BAD, disconnecting=True)
440            await connection.send(response)
441            raise DisconnectedError("Failed to authenticate.")
442        connection.username = username
443        logger.debug(f"Assigned username: {connection}")
444        if username == ADMIN_USERNAME:
445            logger.warning(f"Authenticated as admin: {connection}")
446        await connection.send(Response("Authenticated."))
447
448    def _check_auth(
449        self,
450        username: str,
451        password: str,
452        invite_code: str,
453    ) -> Optional[str]:
454        """Return failure reason or None."""
455        if not username:
456            return "Missing non-empty username."
457        if username in self._deleted_users:
458            return "User deleted."
459        if username in self._connections:
460            return "Username already connected."
461        if username not in self._users:
462            return self._try_register_user(username, password, invite_code)
463        user = self._users[username]
464        if not user.compare_password(password):
465            return "Incorrect password."
466        return None
467
468    def _try_register_user(
469        self,
470        username: str,
471        password: str,
472        invite_code: str,
473    ) -> Optional[str]:
474        """Return failure reason or None."""
475        if invite_code:
476            invite_username = self._invite_codes.get(invite_code)
477            wrong_code = invite_username is None
478            invite_valid = username == invite_username or invite_username == ""
479            if wrong_code or not invite_valid:
480                return "Incorrect username or invite code."
481        if not (self.registration_enabled or invite_code):
482            return "Registration blocked."
483        if self._require_user_password and not password:
484            return "User password required."
485        if not is_username_allowed(username):
486            return "Username not allowed."
487        self._register_user(username, password)
488        if invite_code:
489            del self._invite_codes[invite_code]
490        return None
491
492    def _register_user(self, username: str, password: str, /):
493        """Register new user."""
494        assert username not in self._users
495        if self._require_user_password and not password:
496            raise ValueError("Server requires password for users.")
497        user = User.from_name_password(username, password)
498        self._users[username] = user
499        logger.info(f"Registered {username=}")
500
501    def _add_user_connection(self, connection: UserConnection):
502        """Add the connection to connected users table."""
503        username = connection.username
504        assert username not in self._connections
505        self._connections[username] = connection
506        if self.on_connection:
507            self.on_connection(username, True)
508
509    def _remove_user_connection(self, connection: UserConnection):
510        """Remove the connection from connected users table if exists."""
511        username = connection.username
512        if username not in self._connections:
513            return
514        self._remove_user_from_game(connection.username)
515        del self._connections[username]
516        if self.on_connection:
517            self.on_connection(username, False)
518
519    async def _handle_user(self, connection: UserConnection):
520        """Handle a logged in user connection - handle packets and return responses."""
521        username = connection.username
522        while True:
523            # Wait for packet from user
524            packet = await connection.recv(timeout=3600.0)
525            # Important! We must set the packet's authenticated username.
526            packet.username = username
527            do_log = self.verbose_logging
528            if do_log:
529                logger.debug(packet)
530            if username in self._deleted_users:
531                response = Response("User deleted.", disconnecting=True)
532            else:
533                response: Response = self._handle_packet(packet)
534            if do_log:
535                logger.debug(f"--> {response}")
536            assert isinstance(response, Response)
537            # Also important, to set the game of the response for the client.
538            response.game = self._connections[username].game
539            await connection.send(response)
540            # The packet handler may have determined we are disconnecting
541            if response.disconnecting:
542                raise DisconnectedError(response.message)
543
544    def _handle_packet(self, packet: Packet) -> Response:
545        """Handle a packet from a logged in user."""
546        # Find builtin handler
547        request_handler = self._request_handlers.get(packet.message)
548        if request_handler:
549            return request_handler(self, packet)
550        # Find game handler
551        game_name: Optional[str] = self._connections[packet.username].game
552        if game_name:
553            return self._handle_game_packet(packet, game_name)
554        # No handler found - not in game and not a builtin request
555        return Response(
556            "Please create/join a game.",
557            self._canned_response_payload | dict(packet=packet.debug_repr),
558            status=Status.UNEXPECTED,
559        )
560
561    def _remove_user_from_game(self, username: str):
562        """Remove user from game and delete the game if expired."""
563        connection = self._connections[username]
564        game = self._games.get(connection.game)
565        if not game:
566            return
567        connection.game = None
568        game.remove_user(username)
569        if game.expired:
570            del self._games[game.name]
571        logger.debug(f"User {username!r} removed from {game}")
572
573    def _delete_user(self, username: str):
574        assert username in self._users and username not in self._connections
575        del self._users[username]
576        if username in self._deleted_users:
577            self._deleted_users.remove(username)
578        logger.info(f"Deleted {username=}")
579
580    @_user_packet_handler()
581    def _handle_game_dir(self, packet: Packet) -> Response:
582        """Create a Response with dictionary of games details."""
583        games_dict = {}
584        for name, game in self._games.items():
585            games_dict[game.name] = dict(
586                name=game.name,
587                users=len(game.connected_users),
588                password_protected=game.password_protected,
589                info=game.get_lobby_info(),
590            )
591        return Response("See payload for games directory.", dict(games=games_dict))
592
593    def _create_game(
594        self,
595        name: str,
596        password: str = "",
597        game_data: Optional[str] = None,
598    ) -> LobbyGame:
599        """Create a new game."""
600        assert name not in self._games
601        game = self._game_cls(name, save_string=game_data)
602        lobbygame = LobbyGame(game, name, password)
603        self._games[name] = lobbygame
604        logger.debug(f"Created game: {lobbygame}")
605        return lobbygame
606
607    def _destroy_game(self, game_name: str):
608        """Destroy an existing game."""
609        game = self._games[game_name]
610        while game.connected_users:
611            self._remove_user_from_game(list(game.connected_users)[0])
612        if game_name in self._games:
613            del self._games[game_name]
614        logger.debug(f"Destroyed game: {game_name!r}")
615
616    @_user_packet_handler()
617    def _handle_join_game(
618        self,
619        packet: Packet,
620        /,
621        *,
622        name: str = "",
623    ) -> Response:
624        """Handle a request to join a game."""
625        game_name = name
626        connection = self._connections[packet.username]
627        current_name: Optional[str] = connection.game
628        if current_name:
629            return Response("Must leave game first.", status=Status.UNEXPECTED)
630        if not game_name:
631            return Response("Please specify a game name.", status=Status.UNEXPECTED)
632        if game_name == current_name:
633            return Response("Already in game.", status=Status.UNEXPECTED)
634        game = self._games.get(game_name)
635        if not game:
636            return self._handle_create_game(packet)
637        password = packet.payload.get("password", "")
638        fail = game.add_user(packet.username, password)
639        if fail:
640            return Response(f"Failed to join game: {fail}", status=Status.UNEXPECTED)
641        connection.game = game_name
642        logger.debug(f"User {packet.username!r} joined: {game}")
643        return Response(
644            f"Joined game: {game_name!r}.",
645            dict(heartbeat_rate=game.heartbeat_rate),
646        )
647
648    @_user_packet_handler()
649    def _handle_leave_game(self, packet: Packet) -> Response:
650        """Handle a request to leave the game."""
651        game_name: Optional[str] = self._connections[packet.username].game
652        if not game_name:
653            return Response("Not in game.", status=Status.UNEXPECTED)
654        self._remove_user_from_game(packet.username)
655        logger.debug(f"User {packet.username!r} left game: {game_name!r}")
656        return Response(f"Left game: {game_name!r}.")
657
658    @_user_packet_handler()
659    def _handle_create_game(
660        self,
661        packet: Packet,
662        /,
663        *,
664        name: str = "",
665        password: str = "",
666    ) -> Response:
667        """Handle request to create a new game specified in the payload."""
668        game_name = name
669        connection = self._connections[packet.username]
670        current_game = connection.game
671        if current_game:
672            return Response("Must leave game first.", status=Status.UNEXPECTED)
673        if not is_gamename_allowed(game_name):
674            return Response("Game name not allowed.", status=Status.UNEXPECTED)
675        if game_name in self._games:
676            return Response("Game name already exists.", status=Status.UNEXPECTED)
677        game = self._create_game(game_name, password)
678        fail = game.add_user(packet.username, password)
679        assert not fail
680        connection.game = game_name
681        logger.debug(f"User {packet.username!r} created game: {game}")
682        return Response(
683            f"Created new game: {game_name!r}.",
684            dict(heartbeat_rate=game.heartbeat_rate),
685        )
686
687    def _handle_game_packet(self, packet: Packet, game_name: str) -> Response:
688        """Routes a packet from a logged in user to the game's packet handler.
689
690        Will use the response's `disconnecting` attribute to remove the user
691        from the game, and then clear the attribute.
692        """
693        game = self._games[game_name]
694        response: Response = game.handle_packet(packet)
695        assert isinstance(response, Response)
696        if response.disconnecting:
697            self._remove_user_from_game(packet.username)
698            response.disconnecting = False
699        return response
700
701    @_user_packet_handler()
702    def _handle_help(self, packet: Packet) -> Response:
703        requests = dict()
704        for name, f in self._request_handlers.items():
705            requests[name] = {
706                name: param.annotation.__name__
707                for name, param in _get_packet_handler_params(f).items()
708            }
709        return Response("See payload for requests.", requests)
710
711    # Admin commands
712    @_user_packet_handler(admin=True)
713    def _admin_shutdown(self, packet: Packet) -> Response:
714        """Shutdown the server."""
715        self.shutdown()
716        return Response("Shutting down...")
717
718    @_user_packet_handler(admin=True)
719    def _admin_create_invite(
720        self,
721        packet: Packet,
722        /,
723        *,
724        username: str = "",
725    ) -> Response:
726        """Create an invite code. Can optionally by for a specific username."""
727        code = os.urandom(2).hex()
728        self._invite_codes[code] = username
729        return Response(f"Created invite code: {code}")
730
731    @_user_packet_handler(admin=True)
732    def _admin_register(
733        self,
734        packet: Packet,
735        /,
736        *,
737        set_as: bool = False,
738    ) -> Response:
739        """Set user registration."""
740        self.registration_enabled = set_as
741        return Response(f"Registration enabled: {set_as}")
742
743    @_user_packet_handler(admin=True)
744    def _admin_delete_user(
745        self,
746        packet: Packet,
747        /,
748        *,
749        username: str = ""
750    ) -> Response:
751        """Delete a user by name."""
752        if username not in self._users:
753            return Response(f"No such username {username!r}")
754        self.delete_user(username)
755        return Response(f"Requested delete user {username!r}")
756
757    @_user_packet_handler(admin=True)
758    def _admin_destroy_game(
759        self,
760        packet: Packet,
761        /,
762        *,
763        name: str = "",
764    ) -> Response:
765        """Destroy a game by name."""
766        game_name = name
767        if game_name not in self._games:
768            return Response(f"No such game: {game_name!r}", status=Status.UNEXPECTED)
769        self._destroy_game(game_name)
770        return Response(f"Destroyed game: {game_name!r}")
771
772    @_user_packet_handler(admin=True)
773    def _admin_save(self, packet: Packet) -> Response:
774        """Save all server data to file."""
775        success = self._save_to_disk()
776        return Response(f"Saved {success=} server data to disk: {self._save_file}")
777
778    @_user_packet_handler(admin=True)
779    def _admin_verbose(
780        self,
781        packet: Packet,
782        /,
783        *,
784        set_as: bool = False,
785    ) -> Response:
786        """Set verbose logging."""
787        self.verbose_logging = set_as
788        return Response(f"Verbose logging enabled: {set_as}")
789
790    @_user_packet_handler(admin=True)
791    def _admin_debug(self, packet: Packet) -> Response:
792        """Return debugging info."""
793        games = [str(game) for name, game in sorted(self._games.items())]
794        connected_users = [str(conn) for u, conn in sorted(self._connections.items())]
795        all_users = sorted(self._users.keys())
796        payload = dict(
797            packet=packet.debug_repr,
798            pubkey=self.pubkey,
799            games=games,
800            connected_users=connected_users,
801            all_users=all_users,
802            registration=self.registration_enabled,
803            invite_codes=self._invite_codes,
804            deleted_users=list(self._deleted_users),
805            verbose=self.verbose_logging,
806        )
807        return Response("Debug", payload)
808
809    @_user_packet_handler(admin=True)
810    def _admin_sleep(self, packet: Packet, /, *, seconds: float = 1) -> Response:
811        """Simulate slow response by blocking for the time specified in payload.
812
813        Warning: this actually blocks the entire server. Time is capped at 5 seconds.
814        """
815        max_sleep = 5
816        seconds = min(max_sleep, seconds)
817        time.sleep(seconds)
818        return Response(f"Slept for {seconds} seconds")
819
820    def _save_to_disk(self) -> bool:
821        """Save all data to disk."""
822        if not self._save_file:
823            return False
824        game_data = []
825        for game in self._games.values():
826            save_string = game.get_save_string()
827            if not save_string:
828                continue
829            game_data.append(dict(
830                name=game.name,
831                password=game.password,
832                data=save_string,
833            ))
834        users = [
835            dict(name=u.name, salt=u.salt, password=u.hashed_password)
836            for u in self._users.values() if u.name != ADMIN_USERNAME
837        ]
838        data = dict(
839            users=users,
840            games=game_data,
841            registration=self.registration_enabled,
842            invite_codes=self._invite_codes,
843        )
844        dumped = json.dumps(data, indent=4)
845        self._save_file.parent.mkdir(parents=True, exist_ok=True)
846        with open(self._save_file, "w") as f:
847            f.write(dumped)
848        logger.debug(
849            f"Saved server data to {self._save_file}"
850            f" ({len(users)} users and {len(game_data)} games)"
851        )
852        return True
853
854    def _load_from_disk(self):
855        if not self._save_file or not self._save_file.is_file():
856            return
857        logger.info(f"Loading server data from {self._save_file}")
858        with open(self._save_file) as f:
859            data = f.read()
860        data = json.loads(data)
861        for user in data["users"]:
862            username = user["name"]
863            if username == ADMIN_USERNAME:
864                continue
865            if not is_username_allowed(username):
866                logger.warning(f"Loaded disallowed {username=}")
867            self._users[username] = u = User(username, user["salt"], user["password"])
868            logger.debug(f"Loaded username: {u!r}")
869        for game in data["games"]:
870            game_name = game["name"]
871            if not is_gamename_allowed(game_name):
872                logger.warning(f"Loaded disallowed {game_name=}")
873            self._create_game(game_name, game["password"], game["data"])
874            logger.debug(f"Loaded game: {self._games[game_name]!r}")
875        self._invite_codes |= data["invite_codes"]
876        self.registration_enabled = data["registration"]
877        logger.debug("Loading disk data complete.")
878
879    def __repr__(self):
880        """Object repr."""
881        address = self._address or "public"
882        return (
883            f"<{self.__class__.__qualname__}"
884            f" serving {address}:{self._port}"
885            f" @ {id(self):x}>"
886        )
887
888    _request_handlers = {
889        Request.HELP: _handle_help,
890        Request.GAME_DIR: _handle_game_dir,
891        Request.CREATE_GAME: _handle_create_game,
892        Request.JOIN_GAME: _handle_join_game,
893        Request.LEAVE_GAME: _handle_leave_game,
894        Request.DEBUG: _admin_debug,
895        Request.SAVE: _admin_save,
896        Request.CREATE_INVITE: _admin_create_invite,
897        Request.DESTROY_GAME: _admin_destroy_game,
898        Request.DELETE_USER: _admin_delete_user,
899        Request.REGISTRATION: _admin_register,
900        Request.VERBOSE: _admin_verbose,
901        Request.SLEEP: _admin_sleep,
902        Request.SHUTDOWN: _admin_shutdown,
903    }
904    _canned_response_payload = dict(commands=list(_request_handlers.keys()))
905
906
907__all__ = (
908    "Server",
909    "LobbyGame",
910    "User",
911    "UserConnection",
912    "is_username_allowed",
913    "is_gamename_allowed",
914)
class Server:
251class Server:
252    """The server that hosts games.
253
254    Subclass from `pgnet.Game` and pass it as the *`game`* argument for the server.
255    Then, use the `Server.async_run` coroutine to start the server.
256
257    By default, the server is configured to listen on localhost. To listen
258    globally, set *`listen_globally`* and *`admin_password`*.
259
260    For games to save and load, *`save_file`* must be set (see also:
261    `pgnet.Game.get_save_string`).
262
263    .. note:: Most home networks require port forwarding to be discoverable by remote
264        clients.
265    """
266
267    def __init__(
268        self,
269        game: Type[Game],
270        /,
271        *,
272        listen_globally: bool = False,
273        port: int = DEFAULT_PORT,
274        admin_password: Optional[str] = None,
275        registration_enabled: bool = True,
276        require_user_password: bool = False,
277        on_connection: Optional[Callable[[str, bool], Any]] = None,
278        verbose_logging: bool = False,
279        save_file: Optional[str | Path] = None,
280    ):
281        """Initialize the server.
282
283        Args:
284            listen_globally: Listen globally instead of localhost only.
285                Requires that *`admin_password`* must be set.
286            port: Port number to listen on.
287            admin_password: Password for admin user with elevated priviliges.
288            registration_enabled: Allow new users to register.
289            require_user_password: Require that users have non-empty passwords.
290            on_connection: Callback for when a username connects or disconnects.
291            verbose_logging: Log *all* packets and responses.
292            save_file: Location of file to save and load server sessions.
293        """
294        if listen_globally and not admin_password:
295            logger.warning(
296                "Created server that listens globally without admin password."
297            )
298        admin_password = admin_password or DEFAULT_ADMIN_PASSWORD
299        self._key: Key = Key()
300        self._stop: Optional[asyncio.Future] = None
301        self._require_user_password = require_user_password
302        self._users: dict[str, User] = dict()
303        self._register_user(ADMIN_USERNAME, admin_password)
304        self._games: dict[str, LobbyGame] = {}
305        self._connections: dict[str, Optional[UserConnection]] = {}
306        self._deleted_users: set[str] = set()
307        self._invite_codes: dict[str, str] = {}
308        self._game_cls: Type[Game] = game
309        self._save_file: Optional[Path] = None if save_file is None else Path(save_file)
310        self._address: str = "" if listen_globally else "localhost"
311        self._port: int = port
312        self.registration_enabled: bool = registration_enabled
313        self.on_connection: Optional[Callable[[str, bool], Any]] = on_connection
314        self.verbose_logging: bool = verbose_logging
315        self._load_from_disk()
316        logger.debug(f"{self._save_file=}")
317        logger.debug(f"{self._game_cls=}")
318        logger.debug(f"{self._key=}")
319        print(f"{admin_password=}")  # Use print instead of logger for password
320
321    async def async_run(self, *, on_start: Optional[Callable] = None) -> int:
322        """Start the server.
323
324        The server will listen for connections and pass them off to the
325        connection handler.
326
327        Args:
328            on_start: Callback for when the server is online and handling messages.
329
330        Returns:
331            Exit code as given by the `shutdown` command. A value of -1 indicates a
332                request to reboot.
333        """
334        if self._stop:
335            raise RuntimeError("Cannot run the server more than once concurrently.")
336        self._stop = asyncio.Future()
337        serving_args = (self._connection_handler, self._address, self._port)
338        try:
339            async with websockets.serve(*serving_args):
340                logger.info(f"Handling messages {self}")
341                if on_start:
342                    on_start()
343                await self._listening_loop(self._stop)
344        except OSError as e:
345            added = OSError(f"Server fail. Perhaps one is already running? {self}")
346            raise added from e
347        self._save_to_disk()
348        result = self._stop.result()
349        logger.info(f"Server stop {result=} {self}")
350        self._stop = None
351        return result
352
353    def shutdown(self, result: int = 0, /):
354        """Stop the server.
355
356        The *result* is passed as the return value (exit code) for `Server.async_run`.
357        """
358        if self._stop and not self._stop.done():
359            self._stop.set_result(result)
360
361    def delete_user(self, username: str):
362        """Disconnect and delete a given username from the server."""
363        if username == ADMIN_USERNAME:
364            logger.warning("Cannot delete admin.")
365            return
366        if username in self._connections:
367            self._deleted_users.add(username)
368        else:
369            self._delete_user(username)
370
371    @property
372    def pubkey(self) -> str:
373        """Public key used for end to end encryption."""
374        return self._key.pubkey
375
376    async def _listening_loop(self, stop_future: asyncio.Future):
377        next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL)
378        next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL)
379        while not stop_future.done():
380            await asyncio.sleep(0.1)
381            if arrow.now() >= next_autosave:
382                self._save_to_disk()
383                next_autosave = arrow.now().shift(seconds=AUTOSAVE_INTERVAL)
384            if arrow.now() >= next_interval:
385                for game in self._games.values():
386                    game.update()
387                next_interval = arrow.now().shift(seconds=GAME_UPDATE_INTERVAL)
388
389    async def _connection_handler(self, websocket: ServerWebSocket):
390        """Handle new connections.
391
392        Allows the handshake to fully populate a UserConnection, which may then
393        be handled as a logged in user.
394        """
395        connection = UserConnection(websocket)
396        logger.info(f"New connection: {connection}")
397        try:
398            await self._handle_handshake(connection)
399            self._add_user_connection(connection)
400            logger.info(f"User logged in: {connection}")
401            await self._handle_user(connection)
402        except DisconnectedError as e:
403            logger.debug(f"{e=}")
404        finally:
405            logger.info(f"Closed connection: {connection}")
406            self._remove_user_connection(connection)
407            username = connection.username
408            if username in self._users and username in self._deleted_users:
409                self._delete_user(connection.username)
410
411    async def _handle_handshake(self, connection: UserConnection):
412        """Handle a new connection's handshake sequence. Modifies the connection object.
413
414        First trade public keys and assign the connection's `tunnel`. Then authenticate
415        and assign the connection's `username`.
416        """
417        # Trade public keys
418        packet = await connection.recv()
419        pubkey = packet.payload.get("pubkey")
420        if not pubkey or not isinstance(pubkey, str):
421            response = Response(
422                "Missing public key string.",
423                status=Status.BAD,
424                disconnecting=True,
425            )
426            await connection.send(response)
427            raise DisconnectedError("Incompatible protocol: missing pubkey.")
428        response = Response("key_trade", dict(pubkey=self.pubkey))
429        await connection.send(response)
430        connection.tunnel = self._key.get_tunnel(pubkey)
431        logger.debug(f"Assigned tunnel: {connection}")
432        # Authenticate
433        packet = await connection.recv()
434        username = packet.payload.get("username")
435        password = packet.payload.get("password", "")
436        invite_code = packet.payload.get("invite_code", "")
437        fail = self._check_auth(username, password, invite_code)
438        if fail:
439            # Respond with problem and disconnect
440            response = Response(fail, status=Status.BAD, disconnecting=True)
441            await connection.send(response)
442            raise DisconnectedError("Failed to authenticate.")
443        connection.username = username
444        logger.debug(f"Assigned username: {connection}")
445        if username == ADMIN_USERNAME:
446            logger.warning(f"Authenticated as admin: {connection}")
447        await connection.send(Response("Authenticated."))
448
449    def _check_auth(
450        self,
451        username: str,
452        password: str,
453        invite_code: str,
454    ) -> Optional[str]:
455        """Return failure reason or None."""
456        if not username:
457            return "Missing non-empty username."
458        if username in self._deleted_users:
459            return "User deleted."
460        if username in self._connections:
461            return "Username already connected."
462        if username not in self._users:
463            return self._try_register_user(username, password, invite_code)
464        user = self._users[username]
465        if not user.compare_password(password):
466            return "Incorrect password."
467        return None
468
469    def _try_register_user(
470        self,
471        username: str,
472        password: str,
473        invite_code: str,
474    ) -> Optional[str]:
475        """Return failure reason or None."""
476        if invite_code:
477            invite_username = self._invite_codes.get(invite_code)
478            wrong_code = invite_username is None
479            invite_valid = username == invite_username or invite_username == ""
480            if wrong_code or not invite_valid:
481                return "Incorrect username or invite code."
482        if not (self.registration_enabled or invite_code):
483            return "Registration blocked."
484        if self._require_user_password and not password:
485            return "User password required."
486        if not is_username_allowed(username):
487            return "Username not allowed."
488        self._register_user(username, password)
489        if invite_code:
490            del self._invite_codes[invite_code]
491        return None
492
493    def _register_user(self, username: str, password: str, /):
494        """Register new user."""
495        assert username not in self._users
496        if self._require_user_password and not password:
497            raise ValueError("Server requires password for users.")
498        user = User.from_name_password(username, password)
499        self._users[username] = user
500        logger.info(f"Registered {username=}")
501
502    def _add_user_connection(self, connection: UserConnection):
503        """Add the connection to connected users table."""
504        username = connection.username
505        assert username not in self._connections
506        self._connections[username] = connection
507        if self.on_connection:
508            self.on_connection(username, True)
509
510    def _remove_user_connection(self, connection: UserConnection):
511        """Remove the connection from connected users table if exists."""
512        username = connection.username
513        if username not in self._connections:
514            return
515        self._remove_user_from_game(connection.username)
516        del self._connections[username]
517        if self.on_connection:
518            self.on_connection(username, False)
519
520    async def _handle_user(self, connection: UserConnection):
521        """Handle a logged in user connection - handle packets and return responses."""
522        username = connection.username
523        while True:
524            # Wait for packet from user
525            packet = await connection.recv(timeout=3600.0)
526            # Important! We must set the packet's authenticated username.
527            packet.username = username
528            do_log = self.verbose_logging
529            if do_log:
530                logger.debug(packet)
531            if username in self._deleted_users:
532                response = Response("User deleted.", disconnecting=True)
533            else:
534                response: Response = self._handle_packet(packet)
535            if do_log:
536                logger.debug(f"--> {response}")
537            assert isinstance(response, Response)
538            # Also important, to set the game of the response for the client.
539            response.game = self._connections[username].game
540            await connection.send(response)
541            # The packet handler may have determined we are disconnecting
542            if response.disconnecting:
543                raise DisconnectedError(response.message)
544
545    def _handle_packet(self, packet: Packet) -> Response:
546        """Handle a packet from a logged in user."""
547        # Find builtin handler
548        request_handler = self._request_handlers.get(packet.message)
549        if request_handler:
550            return request_handler(self, packet)
551        # Find game handler
552        game_name: Optional[str] = self._connections[packet.username].game
553        if game_name:
554            return self._handle_game_packet(packet, game_name)
555        # No handler found - not in game and not a builtin request
556        return Response(
557            "Please create/join a game.",
558            self._canned_response_payload | dict(packet=packet.debug_repr),
559            status=Status.UNEXPECTED,
560        )
561
562    def _remove_user_from_game(self, username: str):
563        """Remove user from game and delete the game if expired."""
564        connection = self._connections[username]
565        game = self._games.get(connection.game)
566        if not game:
567            return
568        connection.game = None
569        game.remove_user(username)
570        if game.expired:
571            del self._games[game.name]
572        logger.debug(f"User {username!r} removed from {game}")
573
574    def _delete_user(self, username: str):
575        assert username in self._users and username not in self._connections
576        del self._users[username]
577        if username in self._deleted_users:
578            self._deleted_users.remove(username)
579        logger.info(f"Deleted {username=}")
580
581    @_user_packet_handler()
582    def _handle_game_dir(self, packet: Packet) -> Response:
583        """Create a Response with dictionary of games details."""
584        games_dict = {}
585        for name, game in self._games.items():
586            games_dict[game.name] = dict(
587                name=game.name,
588                users=len(game.connected_users),
589                password_protected=game.password_protected,
590                info=game.get_lobby_info(),
591            )
592        return Response("See payload for games directory.", dict(games=games_dict))
593
594    def _create_game(
595        self,
596        name: str,
597        password: str = "",
598        game_data: Optional[str] = None,
599    ) -> LobbyGame:
600        """Create a new game."""
601        assert name not in self._games
602        game = self._game_cls(name, save_string=game_data)
603        lobbygame = LobbyGame(game, name, password)
604        self._games[name] = lobbygame
605        logger.debug(f"Created game: {lobbygame}")
606        return lobbygame
607
608    def _destroy_game(self, game_name: str):
609        """Destroy an existing game."""
610        game = self._games[game_name]
611        while game.connected_users:
612            self._remove_user_from_game(list(game.connected_users)[0])
613        if game_name in self._games:
614            del self._games[game_name]
615        logger.debug(f"Destroyed game: {game_name!r}")
616
617    @_user_packet_handler()
618    def _handle_join_game(
619        self,
620        packet: Packet,
621        /,
622        *,
623        name: str = "",
624    ) -> Response:
625        """Handle a request to join a game."""
626        game_name = name
627        connection = self._connections[packet.username]
628        current_name: Optional[str] = connection.game
629        if current_name:
630            return Response("Must leave game first.", status=Status.UNEXPECTED)
631        if not game_name:
632            return Response("Please specify a game name.", status=Status.UNEXPECTED)
633        if game_name == current_name:
634            return Response("Already in game.", status=Status.UNEXPECTED)
635        game = self._games.get(game_name)
636        if not game:
637            return self._handle_create_game(packet)
638        password = packet.payload.get("password", "")
639        fail = game.add_user(packet.username, password)
640        if fail:
641            return Response(f"Failed to join game: {fail}", status=Status.UNEXPECTED)
642        connection.game = game_name
643        logger.debug(f"User {packet.username!r} joined: {game}")
644        return Response(
645            f"Joined game: {game_name!r}.",
646            dict(heartbeat_rate=game.heartbeat_rate),
647        )
648
649    @_user_packet_handler()
650    def _handle_leave_game(self, packet: Packet) -> Response:
651        """Handle a request to leave the game."""
652        game_name: Optional[str] = self._connections[packet.username].game
653        if not game_name:
654            return Response("Not in game.", status=Status.UNEXPECTED)
655        self._remove_user_from_game(packet.username)
656        logger.debug(f"User {packet.username!r} left game: {game_name!r}")
657        return Response(f"Left game: {game_name!r}.")
658
659    @_user_packet_handler()
660    def _handle_create_game(
661        self,
662        packet: Packet,
663        /,
664        *,
665        name: str = "",
666        password: str = "",
667    ) -> Response:
668        """Handle request to create a new game specified in the payload."""
669        game_name = name
670        connection = self._connections[packet.username]
671        current_game = connection.game
672        if current_game:
673            return Response("Must leave game first.", status=Status.UNEXPECTED)
674        if not is_gamename_allowed(game_name):
675            return Response("Game name not allowed.", status=Status.UNEXPECTED)
676        if game_name in self._games:
677            return Response("Game name already exists.", status=Status.UNEXPECTED)
678        game = self._create_game(game_name, password)
679        fail = game.add_user(packet.username, password)
680        assert not fail
681        connection.game = game_name
682        logger.debug(f"User {packet.username!r} created game: {game}")
683        return Response(
684            f"Created new game: {game_name!r}.",
685            dict(heartbeat_rate=game.heartbeat_rate),
686        )
687
688    def _handle_game_packet(self, packet: Packet, game_name: str) -> Response:
689        """Routes a packet from a logged in user to the game's packet handler.
690
691        Will use the response's `disconnecting` attribute to remove the user
692        from the game, and then clear the attribute.
693        """
694        game = self._games[game_name]
695        response: Response = game.handle_packet(packet)
696        assert isinstance(response, Response)
697        if response.disconnecting:
698            self._remove_user_from_game(packet.username)
699            response.disconnecting = False
700        return response
701
702    @_user_packet_handler()
703    def _handle_help(self, packet: Packet) -> Response:
704        requests = dict()
705        for name, f in self._request_handlers.items():
706            requests[name] = {
707                name: param.annotation.__name__
708                for name, param in _get_packet_handler_params(f).items()
709            }
710        return Response("See payload for requests.", requests)
711
712    # Admin commands
713    @_user_packet_handler(admin=True)
714    def _admin_shutdown(self, packet: Packet) -> Response:
715        """Shutdown the server."""
716        self.shutdown()
717        return Response("Shutting down...")
718
719    @_user_packet_handler(admin=True)
720    def _admin_create_invite(
721        self,
722        packet: Packet,
723        /,
724        *,
725        username: str = "",
726    ) -> Response:
727        """Create an invite code. Can optionally by for a specific username."""
728        code = os.urandom(2).hex()
729        self._invite_codes[code] = username
730        return Response(f"Created invite code: {code}")
731
732    @_user_packet_handler(admin=True)
733    def _admin_register(
734        self,
735        packet: Packet,
736        /,
737        *,
738        set_as: bool = False,
739    ) -> Response:
740        """Set user registration."""
741        self.registration_enabled = set_as
742        return Response(f"Registration enabled: {set_as}")
743
744    @_user_packet_handler(admin=True)
745    def _admin_delete_user(
746        self,
747        packet: Packet,
748        /,
749        *,
750        username: str = ""
751    ) -> Response:
752        """Delete a user by name."""
753        if username not in self._users:
754            return Response(f"No such username {username!r}")
755        self.delete_user(username)
756        return Response(f"Requested delete user {username!r}")
757
758    @_user_packet_handler(admin=True)
759    def _admin_destroy_game(
760        self,
761        packet: Packet,
762        /,
763        *,
764        name: str = "",
765    ) -> Response:
766        """Destroy a game by name."""
767        game_name = name
768        if game_name not in self._games:
769            return Response(f"No such game: {game_name!r}", status=Status.UNEXPECTED)
770        self._destroy_game(game_name)
771        return Response(f"Destroyed game: {game_name!r}")
772
773    @_user_packet_handler(admin=True)
774    def _admin_save(self, packet: Packet) -> Response:
775        """Save all server data to file."""
776        success = self._save_to_disk()
777        return Response(f"Saved {success=} server data to disk: {self._save_file}")
778
779    @_user_packet_handler(admin=True)
780    def _admin_verbose(
781        self,
782        packet: Packet,
783        /,
784        *,
785        set_as: bool = False,
786    ) -> Response:
787        """Set verbose logging."""
788        self.verbose_logging = set_as
789        return Response(f"Verbose logging enabled: {set_as}")
790
791    @_user_packet_handler(admin=True)
792    def _admin_debug(self, packet: Packet) -> Response:
793        """Return debugging info."""
794        games = [str(game) for name, game in sorted(self._games.items())]
795        connected_users = [str(conn) for u, conn in sorted(self._connections.items())]
796        all_users = sorted(self._users.keys())
797        payload = dict(
798            packet=packet.debug_repr,
799            pubkey=self.pubkey,
800            games=games,
801            connected_users=connected_users,
802            all_users=all_users,
803            registration=self.registration_enabled,
804            invite_codes=self._invite_codes,
805            deleted_users=list(self._deleted_users),
806            verbose=self.verbose_logging,
807        )
808        return Response("Debug", payload)
809
810    @_user_packet_handler(admin=True)
811    def _admin_sleep(self, packet: Packet, /, *, seconds: float = 1) -> Response:
812        """Simulate slow response by blocking for the time specified in payload.
813
814        Warning: this actually blocks the entire server. Time is capped at 5 seconds.
815        """
816        max_sleep = 5
817        seconds = min(max_sleep, seconds)
818        time.sleep(seconds)
819        return Response(f"Slept for {seconds} seconds")
820
821    def _save_to_disk(self) -> bool:
822        """Save all data to disk."""
823        if not self._save_file:
824            return False
825        game_data = []
826        for game in self._games.values():
827            save_string = game.get_save_string()
828            if not save_string:
829                continue
830            game_data.append(dict(
831                name=game.name,
832                password=game.password,
833                data=save_string,
834            ))
835        users = [
836            dict(name=u.name, salt=u.salt, password=u.hashed_password)
837            for u in self._users.values() if u.name != ADMIN_USERNAME
838        ]
839        data = dict(
840            users=users,
841            games=game_data,
842            registration=self.registration_enabled,
843            invite_codes=self._invite_codes,
844        )
845        dumped = json.dumps(data, indent=4)
846        self._save_file.parent.mkdir(parents=True, exist_ok=True)
847        with open(self._save_file, "w") as f:
848            f.write(dumped)
849        logger.debug(
850            f"Saved server data to {self._save_file}"
851            f" ({len(users)} users and {len(game_data)} games)"
852        )
853        return True
854
855    def _load_from_disk(self):
856        if not self._save_file or not self._save_file.is_file():
857            return
858        logger.info(f"Loading server data from {self._save_file}")
859        with open(self._save_file) as f:
860            data = f.read()
861        data = json.loads(data)
862        for user in data["users"]:
863            username = user["name"]
864            if username == ADMIN_USERNAME:
865                continue
866            if not is_username_allowed(username):
867                logger.warning(f"Loaded disallowed {username=}")
868            self._users[username] = u = User(username, user["salt"], user["password"])
869            logger.debug(f"Loaded username: {u!r}")
870        for game in data["games"]:
871            game_name = game["name"]
872            if not is_gamename_allowed(game_name):
873                logger.warning(f"Loaded disallowed {game_name=}")
874            self._create_game(game_name, game["password"], game["data"])
875            logger.debug(f"Loaded game: {self._games[game_name]!r}")
876        self._invite_codes |= data["invite_codes"]
877        self.registration_enabled = data["registration"]
878        logger.debug("Loading disk data complete.")
879
880    def __repr__(self):
881        """Object repr."""
882        address = self._address or "public"
883        return (
884            f"<{self.__class__.__qualname__}"
885            f" serving {address}:{self._port}"
886            f" @ {id(self):x}>"
887        )
888
889    _request_handlers = {
890        Request.HELP: _handle_help,
891        Request.GAME_DIR: _handle_game_dir,
892        Request.CREATE_GAME: _handle_create_game,
893        Request.JOIN_GAME: _handle_join_game,
894        Request.LEAVE_GAME: _handle_leave_game,
895        Request.DEBUG: _admin_debug,
896        Request.SAVE: _admin_save,
897        Request.CREATE_INVITE: _admin_create_invite,
898        Request.DESTROY_GAME: _admin_destroy_game,
899        Request.DELETE_USER: _admin_delete_user,
900        Request.REGISTRATION: _admin_register,
901        Request.VERBOSE: _admin_verbose,
902        Request.SLEEP: _admin_sleep,
903        Request.SHUTDOWN: _admin_shutdown,
904    }
905    _canned_response_payload = dict(commands=list(_request_handlers.keys()))

The server that hosts games.

Subclass from pgnet.Game and pass it as the game argument for the server. Then, use the Server.async_run coroutine to start the server.

By default, the server is configured to listen on localhost. To listen globally, set listen_globally and admin_password.

For games to save and load, save_file must be set (see also: pgnet.Game.get_save_string).

Most home networks require port forwarding to be discoverable by remote

clients.

Server( game: Type[pgnet.util.Game], /, *, listen_globally: bool = False, port: int = 38929, admin_password: Optional[str] = None, registration_enabled: bool = True, require_user_password: bool = False, on_connection: Optional[Callable[[str, bool], Any]] = None, verbose_logging: bool = False, save_file: Union[str, pathlib.Path, NoneType] = None)
267    def __init__(
268        self,
269        game: Type[Game],
270        /,
271        *,
272        listen_globally: bool = False,
273        port: int = DEFAULT_PORT,
274        admin_password: Optional[str] = None,
275        registration_enabled: bool = True,
276        require_user_password: bool = False,
277        on_connection: Optional[Callable[[str, bool], Any]] = None,
278        verbose_logging: bool = False,
279        save_file: Optional[str | Path] = None,
280    ):
281        """Initialize the server.
282
283        Args:
284            listen_globally: Listen globally instead of localhost only.
285                Requires that *`admin_password`* must be set.
286            port: Port number to listen on.
287            admin_password: Password for admin user with elevated priviliges.
288            registration_enabled: Allow new users to register.
289            require_user_password: Require that users have non-empty passwords.
290            on_connection: Callback for when a username connects or disconnects.
291            verbose_logging: Log *all* packets and responses.
292            save_file: Location of file to save and load server sessions.
293        """
294        if listen_globally and not admin_password:
295            logger.warning(
296                "Created server that listens globally without admin password."
297            )
298        admin_password = admin_password or DEFAULT_ADMIN_PASSWORD
299        self._key: Key = Key()
300        self._stop: Optional[asyncio.Future] = None
301        self._require_user_password = require_user_password
302        self._users: dict[str, User] = dict()
303        self._register_user(ADMIN_USERNAME, admin_password)
304        self._games: dict[str, LobbyGame] = {}
305        self._connections: dict[str, Optional[UserConnection]] = {}
306        self._deleted_users: set[str] = set()
307        self._invite_codes: dict[str, str] = {}
308        self._game_cls: Type[Game] = game
309        self._save_file: Optional[Path] = None if save_file is None else Path(save_file)
310        self._address: str = "" if listen_globally else "localhost"
311        self._port: int = port
312        self.registration_enabled: bool = registration_enabled
313        self.on_connection: Optional[Callable[[str, bool], Any]] = on_connection
314        self.verbose_logging: bool = verbose_logging
315        self._load_from_disk()
316        logger.debug(f"{self._save_file=}")
317        logger.debug(f"{self._game_cls=}")
318        logger.debug(f"{self._key=}")
319        print(f"{admin_password=}")  # Use print instead of logger for password

Initialize the server.

Arguments:
  • listen_globally: Listen globally instead of localhost only. Requires that admin_password must be set.
  • port: Port number to listen on.
  • admin_password: Password for admin user with elevated priviliges.
  • registration_enabled: Allow new users to register.
  • require_user_password: Require that users have non-empty passwords.
  • on_connection: Callback for when a username connects or disconnects.
  • verbose_logging: Log all packets and responses.
  • save_file: Location of file to save and load server sessions.
async def async_run(self, *, on_start: Optional[Callable] = None) -> int:
321    async def async_run(self, *, on_start: Optional[Callable] = None) -> int:
322        """Start the server.
323
324        The server will listen for connections and pass them off to the
325        connection handler.
326
327        Args:
328            on_start: Callback for when the server is online and handling messages.
329
330        Returns:
331            Exit code as given by the `shutdown` command. A value of -1 indicates a
332                request to reboot.
333        """
334        if self._stop:
335            raise RuntimeError("Cannot run the server more than once concurrently.")
336        self._stop = asyncio.Future()
337        serving_args = (self._connection_handler, self._address, self._port)
338        try:
339            async with websockets.serve(*serving_args):
340                logger.info(f"Handling messages {self}")
341                if on_start:
342                    on_start()
343                await self._listening_loop(self._stop)
344        except OSError as e:
345            added = OSError(f"Server fail. Perhaps one is already running? {self}")
346            raise added from e
347        self._save_to_disk()
348        result = self._stop.result()
349        logger.info(f"Server stop {result=} {self}")
350        self._stop = None
351        return result

Start the server.

The server will listen for connections and pass them off to the connection handler.

Arguments:
  • on_start: Callback for when the server is online and handling messages.
Returns:

Exit code as given by the shutdown command. A value of -1 indicates a request to reboot.

def shutdown(self, result: int = 0, /):
353    def shutdown(self, result: int = 0, /):
354        """Stop the server.
355
356        The *result* is passed as the return value (exit code) for `Server.async_run`.
357        """
358        if self._stop and not self._stop.done():
359            self._stop.set_result(result)

Stop the server.

The result is passed as the return value (exit code) for Server.async_run.

def delete_user(self, username: str):
361    def delete_user(self, username: str):
362        """Disconnect and delete a given username from the server."""
363        if username == ADMIN_USERNAME:
364            logger.warning("Cannot delete admin.")
365            return
366        if username in self._connections:
367            self._deleted_users.add(username)
368        else:
369            self._delete_user(username)

Disconnect and delete a given username from the server.

pubkey: str

Public key used for end to end encryption.

@dataclass
class LobbyGame:
193@dataclass
194class LobbyGame:
195    """`pgnet.Game` instance wrapper for management by server."""
196
197    game: Game = field(repr=False)
198    name: str
199    password: str
200    connected_users: set[str] = field(default_factory=set)
201
202    @property
203    def heartbeat_rate(self) -> float:
204        """Updates per second."""
205        return self.game.heartbeat_rate
206
207    @property
208    def password_protected(self) -> bool:
209        """If the game is password protected."""
210        return bool(self.password)
211
212    def add_user(self, username: str, password: str) -> Optional[str]:
213        """Return reason if user was not successfully added."""
214        if password != self.password:
215            return "Incorrect password."
216        self.connected_users.add(username)
217        self.game.user_joined(username)
218        return None
219
220    def remove_user(self, username):
221        """Remove user from game."""
222        if username in self.connected_users:
223            self.connected_users.remove(username)
224            self.game.user_left(username)
225
226    def get_save_string(self) -> Optional[str]:
227        """Called by the server when shutting down."""
228        if self.game.persistent:
229            return self.game.get_save_string()
230        return None
231
232    def get_lobby_info(self) -> str:
233        """Called by the server to get game info."""
234        return self.game.get_lobby_info()
235
236    @property
237    def expired(self) -> bool:
238        """If the game is empty and not persistent."""
239        return not self.connected_users and not self.game.persistent
240
241    def handle_packet(self, packet: Packet) -> Response:
242        """Relay packet handling to game instance."""
243        response = self.game.handle_packet(packet)
244        return response
245
246    def update(self):
247        """Called on an interval by the server."""
248        self.game.update()

pgnet.Game instance wrapper for management by server.

LobbyGame( game: pgnet.util.Game, name: str, password: str, connected_users: set[str] = <factory>)
heartbeat_rate: float

Updates per second.

password_protected: bool

If the game is password protected.

def add_user(self, username: str, password: str) -> Optional[str]:
212    def add_user(self, username: str, password: str) -> Optional[str]:
213        """Return reason if user was not successfully added."""
214        if password != self.password:
215            return "Incorrect password."
216        self.connected_users.add(username)
217        self.game.user_joined(username)
218        return None

Return reason if user was not successfully added.

def remove_user(self, username):
220    def remove_user(self, username):
221        """Remove user from game."""
222        if username in self.connected_users:
223            self.connected_users.remove(username)
224            self.game.user_left(username)

Remove user from game.

def get_save_string(self) -> Optional[str]:
226    def get_save_string(self) -> Optional[str]:
227        """Called by the server when shutting down."""
228        if self.game.persistent:
229            return self.game.get_save_string()
230        return None

Called by the server when shutting down.

def get_lobby_info(self) -> str:
232    def get_lobby_info(self) -> str:
233        """Called by the server to get game info."""
234        return self.game.get_lobby_info()

Called by the server to get game info.

expired: bool

If the game is empty and not persistent.

def handle_packet(self, packet: pgnet.util.Packet) -> pgnet.util.Response:
241    def handle_packet(self, packet: Packet) -> Response:
242        """Relay packet handling to game instance."""
243        response = self.game.handle_packet(packet)
244        return response

Relay packet handling to game instance.

def update(self):
246    def update(self):
247        """Called on an interval by the server."""
248        self.game.update()

Called on an interval by the server.

@dataclass
class User:
144@dataclass
145class User:
146    """User authentication info."""
147
148    name: str
149    salt: str
150    hashed_password: str
151
152    @classmethod
153    def from_name_password(cls, name: str, password: str):
154        """Create a new user from a raw (unsalted/unhashed) password."""
155        salt = cls._generate_salt()
156        hashed_password = cls._hash_password(password, salt)
157        return cls(name, salt=salt, hashed_password=hashed_password)
158
159    def compare_password(self, password: str):
160        """Compare a raw (unsalted/unhashed) password to our password."""
161        return self._hash_password(password, self.salt) == self.hashed_password
162
163    @staticmethod
164    def _generate_salt() -> str:
165        return os.urandom(SALT_SIZE).hex()
166
167    @staticmethod
168    def _hash_password(password: str, salt: str) -> str:
169        """Hash a string using Python's hashlib."""
170        return hashlib.sha256(f"{salt}{password}".encode()).hexdigest()

User authentication info.

User(name: str, salt: str, hashed_password: str)
@classmethod
def from_name_password(cls, name: str, password: str):
152    @classmethod
153    def from_name_password(cls, name: str, password: str):
154        """Create a new user from a raw (unsalted/unhashed) password."""
155        salt = cls._generate_salt()
156        hashed_password = cls._hash_password(password, salt)
157        return cls(name, salt=salt, hashed_password=hashed_password)

Create a new user from a raw (unsalted/unhashed) password.

def compare_password(self, password: str):
159    def compare_password(self, password: str):
160        """Compare a raw (unsalted/unhashed) password to our password."""
161        return self._hash_password(password, self.salt) == self.hashed_password

Compare a raw (unsalted/unhashed) password to our password.

@dataclass
class UserConnection(pgnet.util.Connection):
173@dataclass
174class UserConnection(Connection):
175    """Thin wrapper for `pgnet.util.Connection`.
176
177    Provides serialization and deserialization.
178    """
179
180    username: Optional[str] = None
181    game: Optional[str] = None
182
183    async def send(self, response: Response, *args, **kwargs):
184        """Override base method to serialize response."""
185        await super().send(response.serialize(), *args, **kwargs)
186
187    async def recv(self, *args, **kwargs) -> Packet:
188        """Override base method to deserialize packet."""
189        message = await super().recv(*args, **kwargs)
190        return Packet.deserialize(message)

Thin wrapper for pgnet.util.Connection.

Provides serialization and deserialization.

UserConnection( websocket: websockets.legacy.protocol.WebSocketCommonProtocol, tunnel: Optional[pgnet.util.Tunnel] = None, remote: str = '', username: Optional[str] = None, game: Optional[str] = None)
async def send(self, response: pgnet.util.Response, *args, **kwargs):
183    async def send(self, response: Response, *args, **kwargs):
184        """Override base method to serialize response."""
185        await super().send(response.serialize(), *args, **kwargs)

Override base method to serialize response.

async def recv(self, *args, **kwargs) -> pgnet.util.Packet:
187    async def recv(self, *args, **kwargs) -> Packet:
188        """Override base method to deserialize packet."""
189        message = await super().recv(*args, **kwargs)
190        return Packet.deserialize(message)

Override base method to deserialize packet.

def is_username_allowed(name: str, /) -> bool:
124def is_username_allowed(name: str, /) -> bool:
125    """If a username is allowed."""
126    return _check_name(
127        name,
128        max_len=20,
129        allow_whitespace=False,
130        alnum_only=True,
131    )

If a username is allowed.

def is_gamename_allowed(name: str, /) -> bool:
134def is_gamename_allowed(name: str, /) -> bool:
135    """If a game name is allowed."""
136    return _check_name(
137        name,
138        max_len=50,
139        allow_whitespace=True,
140        alnum_only=True,
141    )

If a game name is allowed.