[MouseFox logo]
The MouseFox Project
Join The Community Discord Server
[Discord logo]
Edit on GitHub

pgnet.client

Home of the Client class.

  1"""Home of the `Client` class."""
  2
  3from typing import Optional, Callable, Any, Type
  4from loguru import logger
  5import asyncio
  6from dataclasses import dataclass
  7import websockets
  8from websockets.client import WebSocketClientProtocol
  9from .server import Server
 10from .util import (
 11    Packet,
 12    Response,
 13    Connection,
 14    Key,
 15    Game,
 16    DEFAULT_PORT,
 17    DisconnectedError,
 18    Status,
 19    Request,
 20)
 21
 22
 23DEFAULT_HEARTBEAT_RATE = 10
 24ResponseCallback = Callable[[Response], Any]
 25
 26
 27@dataclass
 28class ClientConnection(Connection):
 29    """Thin wrapper for `pgnet.util.Connection`.
 30
 31    Provides `ClientConnection.send_recv` for combined sending and receiving.
 32    """
 33
 34    game: Optional[str] = None
 35    _websocket_busy: bool = False
 36
 37    async def send_recv(self, packet: Packet) -> Response:
 38        """Send packet and receive response via Baseclass `send` and `recv`."""
 39        if self._websocket_busy:
 40            logger.warning(f"Packet is waiting for busy websocket... {packet}")
 41        while self._websocket_busy and self.websocket.open:
 42            await asyncio.sleep(0.1)
 43        self._websocket_busy = True
 44        try:
 45            await self.send(packet.serialize())
 46            response: str = await self.recv()
 47            return Response.deserialize(response)
 48        finally:
 49            self._websocket_busy = False
 50
 51
 52class Client:
 53    """The client that manages communication with the server.
 54
 55    ## Initializing
 56    This class should not be initialized directly, instead use `Client.remote` or
 57    `Client.local`.
 58    ```python3
 59    # This client will connect to a remote server
 60    remote_client = Client.remote(address="1.2.3.4", username="player")
 61    # This client will create a local server and connect to it
 62    local_client = Client.local(game=MyGame, username="player")
 63    ```
 64
 65    ## Connecting and starting a game
 66    Use `Client.async_connect` to connect to the server. Once connected, use
 67    `Client.get_game_dir`, `Client.create_game`, `Client.join_game`, and
 68    `Client.leave_game` to join or leave a game.
 69
 70    ## Using the packet queue
 71    When in game (`Client.game` is not *None*), use `Client.send` to send a
 72    `pgnet.Packet` to `pgnet.Game.handle_game_packet` and receive a `pgnet.Response`.
 73
 74    ## Client events
 75    It is possible to bind callbacks to client events by subclassing and overriding or
 76    by setting `Client.on_connection`, `Client.on_status`, and `Client.on_game`.
 77
 78    """
 79
 80    def __init__(
 81        self,
 82        *,
 83        address: str,
 84        username: str,
 85        password: str,
 86        invite_code: str,
 87        port: int,
 88        server: Optional[Server],
 89        verify_server_pubkey: str,
 90    ):
 91        """This class should not be initialized directly.
 92
 93        .. note:: Use `Client.local` or `Client.remote` to create a client.
 94        """
 95        self._key: Key = Key()
 96        self._status: str = "New client."
 97        self._server_connection: Optional[WebSocketClientProtocol] = None
 98        self._connected: bool = False
 99        self._do_disconnect: bool = False
100        self._packet_queue: list[tuple[Packet, ResponseCallback]] = []
101        self._game: Optional[str] = None
102        self._heartbeat_interval = 1 / DEFAULT_HEARTBEAT_RATE
103        # Connection details
104        self._username = username
105        self._password = password
106        self._invite_code = invite_code
107        self._address = address
108        self._port = port
109        self._server = server
110        self._verify_server_pubkey = verify_server_pubkey
111
112    @classmethod
113    def local(
114        cls,
115        *,
116        game: Type[Game],
117        username: str,
118        password: str = "",
119        port: int = DEFAULT_PORT,
120        server_factory: Optional[Callable[[Any], Server]] = None,
121    ) -> "Client":
122        """Create a client that uses its own local server. See also `Client.remote`.
123
124        Only the *game* and *username* arguments are required.
125
126        Args:
127            game: `pgnet.Game` class to pass to the local server.
128            username: The user's username.
129            password: The user's password.
130            port: Server port number.
131            server_factory: If provided, will be used to create the local server. Must
132                accept the same arguments as `pgnet.Server` (default). This is useful
133                for using a server subclass or to pass custom arguments to the local
134                server.
135        """
136        server_factory = server_factory or Server
137        server = server_factory(
138            game,
139            listen_globally=False,
140            registration_enabled=True,
141            require_user_password=False,
142        )
143        return cls(
144            address="localhost",
145            username=username,
146            password=password,
147            invite_code="",
148            port=port,
149            server=server,
150            verify_server_pubkey=server.pubkey,
151        )
152
153    @classmethod
154    def remote(
155        cls,
156        *,
157        address: str,
158        username: str,
159        password: str = "",
160        invite_code: str = "",
161        port: int = DEFAULT_PORT,
162        verify_server_pubkey: str = "",
163    ) -> "Client":
164        """Create a client that connects to a remote server. See also `Client.local`.
165
166        Args:
167            address: Server IP address.
168            username: The user's username.
169            password: The user's password.
170            invite_code: An invite code for creating a new user.
171            port: Server port number.
172            verify_server_pubkey: If provided, will compare against the
173                `pgnet.Server.pubkey` of the server and disconnect if they do not match.
174        """
175        return cls(
176            address=address,
177            username=username,
178            password=password,
179            invite_code=invite_code,
180            port=port,
181            server=None,
182            verify_server_pubkey=verify_server_pubkey,
183        )
184
185    async def async_connect(self):
186        """Connect to a server.
187
188        This procedure will automatically create an end-to-end encrypted connection
189        (optionally verifying the public key first), and authenticate the username and
190        password. Only after succesfully completing these steps will the client be
191        considered connected and ready to process the packet queue from `Client.send`.
192        """
193        logger.debug(f"Connecting... {self}")
194        if self._server is None:
195            return await self._async_connect_remote()
196        else:
197            return await self._async_connect_local()
198
199    def disconnect(self):
200        """Close the connection."""
201        self._do_disconnect = True
202
203    def get_game_dir(self, callback: Callable, /):
204        """Get the games directory from the server and pass the response to callback."""
205        self.send(Packet(Request.GAME_DIR), callback, do_next=True)
206
207    def create_game(
208        self,
209        name: str,
210        /,
211        *,
212        password: Optional[str] = None,
213        callback: Optional[ResponseCallback] = None,
214    ):
215        """Request from the server to create and join a game."""
216        payload = dict(name=name)
217        if password:
218            payload["password"] = password
219        self.send(Packet(Request.CREATE_GAME, payload), callback, do_next=True)
220
221    def join_game(
222        self,
223        name: str,
224        /,
225        *,
226        password: Optional[str] = None,
227        callback: Optional[ResponseCallback] = None,
228    ):
229        """Request from the server to join a game."""
230        payload = dict(name=name)
231        if password:
232            payload["password"] = password
233        self.send(Packet(Request.JOIN_GAME, payload), callback, do_next=True)
234
235    def leave_game(
236        self,
237        *,
238        callback: Optional[ResponseCallback] = None,
239    ):
240        """Request from the server to leave the game."""
241        self.send(Packet(Request.LEAVE_GAME), callback, do_next=True)
242
243    def send(
244        self,
245        packet: Packet,
246        callback: Optional[ResponseCallback] = None,
247        /,
248        do_next: bool = False,
249    ):
250        """Add a `pgnet.Packet` to the queue.
251
252        If a callback was given, the `pgnet.Response` will be passed to it. Callbacks
253        are not ensured, as the queue can be arbitrarily cleared using
254        `Client.flush_queue`.
255        """
256        if not self._connected:
257            logger.warning(f"Cannot queue packets while disconnected: {packet}")
258            return
259        packet_count = len(self._packet_queue)
260        if packet_count >= 5:
261            logger.warning(f"{packet_count} packets pending, adding: {packet}")
262        packet_callback = (packet, callback)
263        if do_next:
264            self._packet_queue.insert(0, packet_callback)
265        else:
266            self._packet_queue.append(packet_callback)
267
268    def flush_queue(self):
269        """Clear packets and their respective callbacks from the queue.
270
271        See also: `Client.send`.
272        """
273        if self._packet_queue:
274            logger.debug(f"Discarding messages:  {self._packet_queue}")
275        self._packet_queue = []
276
277    def on_connection(self, connected: bool):
278        """Called when connected or disconnected. See also: `Client.connected`."""
279        pass
280
281    @property
282    def connected(self) -> bool:
283        """If we are connected to the server. See also: `Client.on_connection`."""
284        return self._connected
285
286    def on_status(self, status: str):
287        """Called with feedback on client status. See also: `Client.status`."""
288        pass
289
290    @property
291    def status(self) -> str:
292        """Last status feedback message. See also: `Client.on_status`."""
293        return self._status
294
295    def on_game(self, game_name: Optional[str]):
296        """Called when joining or leaving a game. See also: `Client.game`."""
297        pass
298
299    @property
300    def game(self) -> Optional[str]:
301        """Currently joined game name. See also: `Client.on_game`."""
302        return self._game
303
304    def on_heartbeat(self, heartbeat: Response):
305        """Override this method to implement heartbeat updates.
306
307        The *heartbeat* Response is given by `pgnet.util.Game.handle_heartbeat`. The
308        heartbeat rate is set by `pgnet.util.Game.heartbeat_rate`.
309
310        See also: `Client.heartbeat_payload`.
311        """
312        pass
313
314    def heartbeat_payload(self) -> dict:
315        """Override this method to add data to the heartbeat request payload.
316
317        This payload is passed to `pgnet.util.Game.handle_heartbeat`. The heartbeat rate
318        is set by `pgnet.util.Game.heartbeat_rate`.
319
320        See also: `Client.on_heartbeat`.
321        """
322        return dict()
323
324    async def _async_connect_remote(self):
325        """Connect to the server.
326
327        Gets a websocket to create a `ClientConnection` object. This is passed to the
328        handshake handler to be populated, and then to the user connection for handling
329        the packet queue. The heartbeat task is managed here.
330        """
331        if self._server_connection is not None:
332            raise RuntimeError("Cannot open more than one connection per client.")
333        logger.debug("Connecting to server...")
334        full_address = f"{self._address}:{self._port}"
335        self._server_connection = websockets.connect(
336            f"ws://{full_address}",
337            close_timeout=1,
338        )
339        self._set_status(f"Connecting to {full_address}...", logger.info)
340        connection: Optional[ClientConnection] = None
341        heartbeat: Optional[asyncio.Task] = None
342        try:
343            try:
344                websocket = await self._server_connection
345            except OSError as e:
346                logger.debug(f"{e=}")
347                raise DisconnectedError("Failed to call server.")
348            connection = ClientConnection(websocket)
349            self._set_status("Logging in...", logger.info)
350            await self._handle_handshake(connection)
351            self._set_connection(True)
352            self._set_status(
353                f"Logged in as {self._username} @ {connection.remote}",
354                logger.info,
355            )
356            heartbeat = asyncio.create_task(self._async_heartbeat())
357            await self._handle_user_connection(connection)
358        except DisconnectedError as e:
359            logger.debug(f"{e=}")
360            if connection:
361                await connection.close()
362            if heartbeat:
363                heartbeat.cancel()
364            self._set_connection(False)
365            self._server_connection = None
366            self._set_status(f"Disconnected: {e.args[0]}")
367            logger.info(f"Connection terminated. {self}")
368            return
369        logger.warning(f"Connection terminated without DisconnectedError {connection}")
370
371    async def _async_connect_local(self):
372        """Wraps `Client._async_connect_remote to cleanup/teardown the local server."""
373        logger.debug("Setting up local server...")
374        assert isinstance(self._server, Server)
375        started = asyncio.Future()
376        server_coro = self._server.async_run(on_start=lambda *a: started.set_result(1))
377        server_task = asyncio.create_task(server_coro)
378        server_task.add_done_callback(lambda *a: self.disconnect())
379        await asyncio.wait((server_task, started), return_when=asyncio.FIRST_COMPLETED)
380        await self._async_connect_remote()
381        self.disconnect()
382        self._server.shutdown()
383
384    async def _handle_handshake(self, connection: ClientConnection):
385        """Handle a new connection's handshake sequence. Modifies the connection object.
386
387        First trade public keys and assign the connection's `tunnel`. Then log in
388        with our username.
389        """
390        # Trade public keys
391        packet = Packet("key_trade", dict(pubkey=self._key.pubkey))
392        response = await connection.send_recv(packet)
393        pubkey = response.payload.get("pubkey")
394        if not pubkey or not isinstance(pubkey, str):
395            raise DisconnectedError("Missing public key string from server.")
396        if self._verify_server_pubkey:
397            if pubkey != self._verify_server_pubkey:
398                raise DisconnectedError("Unverified server public key.")
399            logger.debug(f"Server pubkey verified: {pubkey=}")
400        connection.tunnel = self._key.get_tunnel(pubkey)
401        logger.debug(f"Assigned tunnel: {connection}")
402        # Authenticate
403        handshake_payload = dict(
404            username=self._username,
405            password=self._password,
406            invite_code=self._invite_code,
407        )
408        packet = Packet("handshake", handshake_payload)
409        response = await connection.send_recv(packet)
410        if response.status != Status.OK:
411            m = response.message
412            logger.info(m)
413            raise DisconnectedError(m)
414
415    async def _async_heartbeat(self):
416        """Periodically update while connected and in game.
417
418        Will create a heartbeat request using `Client.heartbeat_payload` and pass the
419        response to `Client.on_heartbeat`.
420        """
421        while True:
422            await asyncio.sleep(self._heartbeat_interval)
423            if self.connected and self.game:
424                packet = Packet(Request.HEARTBEAT_UPDATE, self.heartbeat_payload())
425                self.send(packet, self.on_heartbeat)
426
427    async def _handle_user_connection(self, connection: ClientConnection):
428        """Handle the connection after handshake - process the packet queue."""
429        self._do_disconnect = False
430        while not self._do_disconnect:
431            # Wait for queued packets
432            if not self._packet_queue:
433                await asyncio.sleep(0.01)
434                continue
435            # Send and callback with response
436            packet, callback = self._packet_queue.pop(0)
437            response = await connection.send_recv(packet)
438            if response.status != Status.OK:
439                logger.info(f"Status code: {response.debug_repr}")
440            self._handle_game_change(response)
441            if callback:
442                callback(response)
443            # Disconnect if alerted
444            if response.disconnecting:
445                logger.info(f"Disconnected.\n{packet}\n{response.debug_repr}")
446                raise DisconnectedError(response.message)
447        raise DisconnectedError("Client closed connection.")
448
449    def _set_status(self, status: str, logger: Optional[Callable] = None):
450        """Set the client status message with associated callback."""
451        self._status = status
452        if logger:
453            logger(status)
454        if self.on_status:
455            self.on_status(status)
456
457    def _set_connection(self, set_as: bool, /):
458        """Set the client connection status with associated callback."""
459        if self._connected == set_as:
460            return
461        logger.debug(f"Setting connection as: {set_as}")
462        self._connected = set_as
463        if self.on_connection:
464            self.on_connection(set_as)
465
466    def _handle_game_change(self, response: Response):
467        """Handle game change.
468
469        Set the client game name and heartbeat rate, then call event callback.
470        """
471        game_name = response.game
472        if game_name != self._game:
473            if game_name is None:
474                self._set_status(f"Left game: {self._game}")
475            else:
476                self._set_status(f"Joined game: {game_name}")
477            hb_rate = response.payload.get("heartbeat_rate", DEFAULT_HEARTBEAT_RATE)
478            self._heartbeat_interval = 1 / hb_rate
479            logger.debug(f"{self._heartbeat_interval=}")
480            self._game = game_name
481            if self.on_game:
482                self.on_game(game_name)
483
484    def __repr__(self):
485        """Object repr."""
486        local = "(local) " if self._server else ""
487        conn = "connected" if self.connected else "disconnected"
488        return (
489            f"<{self.__class__.__qualname__} {local}{self._username!r}"
490            f" {conn}, {self._address}:{self._port}"
491            f" @ {id(self):x}>"
492        )
493
494
495__all__ = (
496    "Client",
497    "ClientConnection",
498)
class Client:
 53class Client:
 54    """The client that manages communication with the server.
 55
 56    ## Initializing
 57    This class should not be initialized directly, instead use `Client.remote` or
 58    `Client.local`.
 59    ```python3
 60    # This client will connect to a remote server
 61    remote_client = Client.remote(address="1.2.3.4", username="player")
 62    # This client will create a local server and connect to it
 63    local_client = Client.local(game=MyGame, username="player")
 64    ```
 65
 66    ## Connecting and starting a game
 67    Use `Client.async_connect` to connect to the server. Once connected, use
 68    `Client.get_game_dir`, `Client.create_game`, `Client.join_game`, and
 69    `Client.leave_game` to join or leave a game.
 70
 71    ## Using the packet queue
 72    When in game (`Client.game` is not *None*), use `Client.send` to send a
 73    `pgnet.Packet` to `pgnet.Game.handle_game_packet` and receive a `pgnet.Response`.
 74
 75    ## Client events
 76    It is possible to bind callbacks to client events by subclassing and overriding or
 77    by setting `Client.on_connection`, `Client.on_status`, and `Client.on_game`.
 78
 79    """
 80
 81    def __init__(
 82        self,
 83        *,
 84        address: str,
 85        username: str,
 86        password: str,
 87        invite_code: str,
 88        port: int,
 89        server: Optional[Server],
 90        verify_server_pubkey: str,
 91    ):
 92        """This class should not be initialized directly.
 93
 94        .. note:: Use `Client.local` or `Client.remote` to create a client.
 95        """
 96        self._key: Key = Key()
 97        self._status: str = "New client."
 98        self._server_connection: Optional[WebSocketClientProtocol] = None
 99        self._connected: bool = False
100        self._do_disconnect: bool = False
101        self._packet_queue: list[tuple[Packet, ResponseCallback]] = []
102        self._game: Optional[str] = None
103        self._heartbeat_interval = 1 / DEFAULT_HEARTBEAT_RATE
104        # Connection details
105        self._username = username
106        self._password = password
107        self._invite_code = invite_code
108        self._address = address
109        self._port = port
110        self._server = server
111        self._verify_server_pubkey = verify_server_pubkey
112
113    @classmethod
114    def local(
115        cls,
116        *,
117        game: Type[Game],
118        username: str,
119        password: str = "",
120        port: int = DEFAULT_PORT,
121        server_factory: Optional[Callable[[Any], Server]] = None,
122    ) -> "Client":
123        """Create a client that uses its own local server. See also `Client.remote`.
124
125        Only the *game* and *username* arguments are required.
126
127        Args:
128            game: `pgnet.Game` class to pass to the local server.
129            username: The user's username.
130            password: The user's password.
131            port: Server port number.
132            server_factory: If provided, will be used to create the local server. Must
133                accept the same arguments as `pgnet.Server` (default). This is useful
134                for using a server subclass or to pass custom arguments to the local
135                server.
136        """
137        server_factory = server_factory or Server
138        server = server_factory(
139            game,
140            listen_globally=False,
141            registration_enabled=True,
142            require_user_password=False,
143        )
144        return cls(
145            address="localhost",
146            username=username,
147            password=password,
148            invite_code="",
149            port=port,
150            server=server,
151            verify_server_pubkey=server.pubkey,
152        )
153
154    @classmethod
155    def remote(
156        cls,
157        *,
158        address: str,
159        username: str,
160        password: str = "",
161        invite_code: str = "",
162        port: int = DEFAULT_PORT,
163        verify_server_pubkey: str = "",
164    ) -> "Client":
165        """Create a client that connects to a remote server. See also `Client.local`.
166
167        Args:
168            address: Server IP address.
169            username: The user's username.
170            password: The user's password.
171            invite_code: An invite code for creating a new user.
172            port: Server port number.
173            verify_server_pubkey: If provided, will compare against the
174                `pgnet.Server.pubkey` of the server and disconnect if they do not match.
175        """
176        return cls(
177            address=address,
178            username=username,
179            password=password,
180            invite_code=invite_code,
181            port=port,
182            server=None,
183            verify_server_pubkey=verify_server_pubkey,
184        )
185
186    async def async_connect(self):
187        """Connect to a server.
188
189        This procedure will automatically create an end-to-end encrypted connection
190        (optionally verifying the public key first), and authenticate the username and
191        password. Only after succesfully completing these steps will the client be
192        considered connected and ready to process the packet queue from `Client.send`.
193        """
194        logger.debug(f"Connecting... {self}")
195        if self._server is None:
196            return await self._async_connect_remote()
197        else:
198            return await self._async_connect_local()
199
200    def disconnect(self):
201        """Close the connection."""
202        self._do_disconnect = True
203
204    def get_game_dir(self, callback: Callable, /):
205        """Get the games directory from the server and pass the response to callback."""
206        self.send(Packet(Request.GAME_DIR), callback, do_next=True)
207
208    def create_game(
209        self,
210        name: str,
211        /,
212        *,
213        password: Optional[str] = None,
214        callback: Optional[ResponseCallback] = None,
215    ):
216        """Request from the server to create and join a game."""
217        payload = dict(name=name)
218        if password:
219            payload["password"] = password
220        self.send(Packet(Request.CREATE_GAME, payload), callback, do_next=True)
221
222    def join_game(
223        self,
224        name: str,
225        /,
226        *,
227        password: Optional[str] = None,
228        callback: Optional[ResponseCallback] = None,
229    ):
230        """Request from the server to join a game."""
231        payload = dict(name=name)
232        if password:
233            payload["password"] = password
234        self.send(Packet(Request.JOIN_GAME, payload), callback, do_next=True)
235
236    def leave_game(
237        self,
238        *,
239        callback: Optional[ResponseCallback] = None,
240    ):
241        """Request from the server to leave the game."""
242        self.send(Packet(Request.LEAVE_GAME), callback, do_next=True)
243
244    def send(
245        self,
246        packet: Packet,
247        callback: Optional[ResponseCallback] = None,
248        /,
249        do_next: bool = False,
250    ):
251        """Add a `pgnet.Packet` to the queue.
252
253        If a callback was given, the `pgnet.Response` will be passed to it. Callbacks
254        are not ensured, as the queue can be arbitrarily cleared using
255        `Client.flush_queue`.
256        """
257        if not self._connected:
258            logger.warning(f"Cannot queue packets while disconnected: {packet}")
259            return
260        packet_count = len(self._packet_queue)
261        if packet_count >= 5:
262            logger.warning(f"{packet_count} packets pending, adding: {packet}")
263        packet_callback = (packet, callback)
264        if do_next:
265            self._packet_queue.insert(0, packet_callback)
266        else:
267            self._packet_queue.append(packet_callback)
268
269    def flush_queue(self):
270        """Clear packets and their respective callbacks from the queue.
271
272        See also: `Client.send`.
273        """
274        if self._packet_queue:
275            logger.debug(f"Discarding messages:  {self._packet_queue}")
276        self._packet_queue = []
277
278    def on_connection(self, connected: bool):
279        """Called when connected or disconnected. See also: `Client.connected`."""
280        pass
281
282    @property
283    def connected(self) -> bool:
284        """If we are connected to the server. See also: `Client.on_connection`."""
285        return self._connected
286
287    def on_status(self, status: str):
288        """Called with feedback on client status. See also: `Client.status`."""
289        pass
290
291    @property
292    def status(self) -> str:
293        """Last status feedback message. See also: `Client.on_status`."""
294        return self._status
295
296    def on_game(self, game_name: Optional[str]):
297        """Called when joining or leaving a game. See also: `Client.game`."""
298        pass
299
300    @property
301    def game(self) -> Optional[str]:
302        """Currently joined game name. See also: `Client.on_game`."""
303        return self._game
304
305    def on_heartbeat(self, heartbeat: Response):
306        """Override this method to implement heartbeat updates.
307
308        The *heartbeat* Response is given by `pgnet.util.Game.handle_heartbeat`. The
309        heartbeat rate is set by `pgnet.util.Game.heartbeat_rate`.
310
311        See also: `Client.heartbeat_payload`.
312        """
313        pass
314
315    def heartbeat_payload(self) -> dict:
316        """Override this method to add data to the heartbeat request payload.
317
318        This payload is passed to `pgnet.util.Game.handle_heartbeat`. The heartbeat rate
319        is set by `pgnet.util.Game.heartbeat_rate`.
320
321        See also: `Client.on_heartbeat`.
322        """
323        return dict()
324
325    async def _async_connect_remote(self):
326        """Connect to the server.
327
328        Gets a websocket to create a `ClientConnection` object. This is passed to the
329        handshake handler to be populated, and then to the user connection for handling
330        the packet queue. The heartbeat task is managed here.
331        """
332        if self._server_connection is not None:
333            raise RuntimeError("Cannot open more than one connection per client.")
334        logger.debug("Connecting to server...")
335        full_address = f"{self._address}:{self._port}"
336        self._server_connection = websockets.connect(
337            f"ws://{full_address}",
338            close_timeout=1,
339        )
340        self._set_status(f"Connecting to {full_address}...", logger.info)
341        connection: Optional[ClientConnection] = None
342        heartbeat: Optional[asyncio.Task] = None
343        try:
344            try:
345                websocket = await self._server_connection
346            except OSError as e:
347                logger.debug(f"{e=}")
348                raise DisconnectedError("Failed to call server.")
349            connection = ClientConnection(websocket)
350            self._set_status("Logging in...", logger.info)
351            await self._handle_handshake(connection)
352            self._set_connection(True)
353            self._set_status(
354                f"Logged in as {self._username} @ {connection.remote}",
355                logger.info,
356            )
357            heartbeat = asyncio.create_task(self._async_heartbeat())
358            await self._handle_user_connection(connection)
359        except DisconnectedError as e:
360            logger.debug(f"{e=}")
361            if connection:
362                await connection.close()
363            if heartbeat:
364                heartbeat.cancel()
365            self._set_connection(False)
366            self._server_connection = None
367            self._set_status(f"Disconnected: {e.args[0]}")
368            logger.info(f"Connection terminated. {self}")
369            return
370        logger.warning(f"Connection terminated without DisconnectedError {connection}")
371
372    async def _async_connect_local(self):
373        """Wraps `Client._async_connect_remote to cleanup/teardown the local server."""
374        logger.debug("Setting up local server...")
375        assert isinstance(self._server, Server)
376        started = asyncio.Future()
377        server_coro = self._server.async_run(on_start=lambda *a: started.set_result(1))
378        server_task = asyncio.create_task(server_coro)
379        server_task.add_done_callback(lambda *a: self.disconnect())
380        await asyncio.wait((server_task, started), return_when=asyncio.FIRST_COMPLETED)
381        await self._async_connect_remote()
382        self.disconnect()
383        self._server.shutdown()
384
385    async def _handle_handshake(self, connection: ClientConnection):
386        """Handle a new connection's handshake sequence. Modifies the connection object.
387
388        First trade public keys and assign the connection's `tunnel`. Then log in
389        with our username.
390        """
391        # Trade public keys
392        packet = Packet("key_trade", dict(pubkey=self._key.pubkey))
393        response = await connection.send_recv(packet)
394        pubkey = response.payload.get("pubkey")
395        if not pubkey or not isinstance(pubkey, str):
396            raise DisconnectedError("Missing public key string from server.")
397        if self._verify_server_pubkey:
398            if pubkey != self._verify_server_pubkey:
399                raise DisconnectedError("Unverified server public key.")
400            logger.debug(f"Server pubkey verified: {pubkey=}")
401        connection.tunnel = self._key.get_tunnel(pubkey)
402        logger.debug(f"Assigned tunnel: {connection}")
403        # Authenticate
404        handshake_payload = dict(
405            username=self._username,
406            password=self._password,
407            invite_code=self._invite_code,
408        )
409        packet = Packet("handshake", handshake_payload)
410        response = await connection.send_recv(packet)
411        if response.status != Status.OK:
412            m = response.message
413            logger.info(m)
414            raise DisconnectedError(m)
415
416    async def _async_heartbeat(self):
417        """Periodically update while connected and in game.
418
419        Will create a heartbeat request using `Client.heartbeat_payload` and pass the
420        response to `Client.on_heartbeat`.
421        """
422        while True:
423            await asyncio.sleep(self._heartbeat_interval)
424            if self.connected and self.game:
425                packet = Packet(Request.HEARTBEAT_UPDATE, self.heartbeat_payload())
426                self.send(packet, self.on_heartbeat)
427
428    async def _handle_user_connection(self, connection: ClientConnection):
429        """Handle the connection after handshake - process the packet queue."""
430        self._do_disconnect = False
431        while not self._do_disconnect:
432            # Wait for queued packets
433            if not self._packet_queue:
434                await asyncio.sleep(0.01)
435                continue
436            # Send and callback with response
437            packet, callback = self._packet_queue.pop(0)
438            response = await connection.send_recv(packet)
439            if response.status != Status.OK:
440                logger.info(f"Status code: {response.debug_repr}")
441            self._handle_game_change(response)
442            if callback:
443                callback(response)
444            # Disconnect if alerted
445            if response.disconnecting:
446                logger.info(f"Disconnected.\n{packet}\n{response.debug_repr}")
447                raise DisconnectedError(response.message)
448        raise DisconnectedError("Client closed connection.")
449
450    def _set_status(self, status: str, logger: Optional[Callable] = None):
451        """Set the client status message with associated callback."""
452        self._status = status
453        if logger:
454            logger(status)
455        if self.on_status:
456            self.on_status(status)
457
458    def _set_connection(self, set_as: bool, /):
459        """Set the client connection status with associated callback."""
460        if self._connected == set_as:
461            return
462        logger.debug(f"Setting connection as: {set_as}")
463        self._connected = set_as
464        if self.on_connection:
465            self.on_connection(set_as)
466
467    def _handle_game_change(self, response: Response):
468        """Handle game change.
469
470        Set the client game name and heartbeat rate, then call event callback.
471        """
472        game_name = response.game
473        if game_name != self._game:
474            if game_name is None:
475                self._set_status(f"Left game: {self._game}")
476            else:
477                self._set_status(f"Joined game: {game_name}")
478            hb_rate = response.payload.get("heartbeat_rate", DEFAULT_HEARTBEAT_RATE)
479            self._heartbeat_interval = 1 / hb_rate
480            logger.debug(f"{self._heartbeat_interval=}")
481            self._game = game_name
482            if self.on_game:
483                self.on_game(game_name)
484
485    def __repr__(self):
486        """Object repr."""
487        local = "(local) " if self._server else ""
488        conn = "connected" if self.connected else "disconnected"
489        return (
490            f"<{self.__class__.__qualname__} {local}{self._username!r}"
491            f" {conn}, {self._address}:{self._port}"
492            f" @ {id(self):x}>"
493        )

The client that manages communication with the server.

Initializing

This class should not be initialized directly, instead use Client.remote or Client.local.

# This client will connect to a remote server
remote_client = Client.remote(address="1.2.3.4", username="player")
# This client will create a local server and connect to it
local_client = Client.local(game=MyGame, username="player")

Connecting and starting a game

Use Client.async_connect to connect to the server. Once connected, use Client.get_game_dir, Client.create_game, Client.join_game, and Client.leave_game to join or leave a game.

Using the packet queue

When in game (Client.game is not None), use Client.send to send a pgnet.Packet to pgnet.Game.handle_game_packet and receive a pgnet.Response.

Client events

It is possible to bind callbacks to client events by subclassing and overriding or by setting Client.on_connection, Client.on_status, and Client.on_game.

Client( *, address: str, username: str, password: str, invite_code: str, port: int, server: Optional[pgnet.server.Server], verify_server_pubkey: str)
 81    def __init__(
 82        self,
 83        *,
 84        address: str,
 85        username: str,
 86        password: str,
 87        invite_code: str,
 88        port: int,
 89        server: Optional[Server],
 90        verify_server_pubkey: str,
 91    ):
 92        """This class should not be initialized directly.
 93
 94        .. note:: Use `Client.local` or `Client.remote` to create a client.
 95        """
 96        self._key: Key = Key()
 97        self._status: str = "New client."
 98        self._server_connection: Optional[WebSocketClientProtocol] = None
 99        self._connected: bool = False
100        self._do_disconnect: bool = False
101        self._packet_queue: list[tuple[Packet, ResponseCallback]] = []
102        self._game: Optional[str] = None
103        self._heartbeat_interval = 1 / DEFAULT_HEARTBEAT_RATE
104        # Connection details
105        self._username = username
106        self._password = password
107        self._invite_code = invite_code
108        self._address = address
109        self._port = port
110        self._server = server
111        self._verify_server_pubkey = verify_server_pubkey

This class should not be initialized directly.

Use Client.local or Client.remote to create a client.
@classmethod
def local( cls, *, game: Type[pgnet.util.Game], username: str, password: str = '', port: int = 38929, server_factory: Optional[Callable[[Any], pgnet.server.Server]] = None) -> pgnet.client.Client:
113    @classmethod
114    def local(
115        cls,
116        *,
117        game: Type[Game],
118        username: str,
119        password: str = "",
120        port: int = DEFAULT_PORT,
121        server_factory: Optional[Callable[[Any], Server]] = None,
122    ) -> "Client":
123        """Create a client that uses its own local server. See also `Client.remote`.
124
125        Only the *game* and *username* arguments are required.
126
127        Args:
128            game: `pgnet.Game` class to pass to the local server.
129            username: The user's username.
130            password: The user's password.
131            port: Server port number.
132            server_factory: If provided, will be used to create the local server. Must
133                accept the same arguments as `pgnet.Server` (default). This is useful
134                for using a server subclass or to pass custom arguments to the local
135                server.
136        """
137        server_factory = server_factory or Server
138        server = server_factory(
139            game,
140            listen_globally=False,
141            registration_enabled=True,
142            require_user_password=False,
143        )
144        return cls(
145            address="localhost",
146            username=username,
147            password=password,
148            invite_code="",
149            port=port,
150            server=server,
151            verify_server_pubkey=server.pubkey,
152        )

Create a client that uses its own local server. See also Client.remote.

Only the game and username arguments are required.

Arguments:
  • game: pgnet.Game class to pass to the local server.
  • username: The user's username.
  • password: The user's password.
  • port: Server port number.
  • server_factory: If provided, will be used to create the local server. Must accept the same arguments as pgnet.Server (default). This is useful for using a server subclass or to pass custom arguments to the local server.
@classmethod
def remote( cls, *, address: str, username: str, password: str = '', invite_code: str = '', port: int = 38929, verify_server_pubkey: str = '') -> pgnet.client.Client:
154    @classmethod
155    def remote(
156        cls,
157        *,
158        address: str,
159        username: str,
160        password: str = "",
161        invite_code: str = "",
162        port: int = DEFAULT_PORT,
163        verify_server_pubkey: str = "",
164    ) -> "Client":
165        """Create a client that connects to a remote server. See also `Client.local`.
166
167        Args:
168            address: Server IP address.
169            username: The user's username.
170            password: The user's password.
171            invite_code: An invite code for creating a new user.
172            port: Server port number.
173            verify_server_pubkey: If provided, will compare against the
174                `pgnet.Server.pubkey` of the server and disconnect if they do not match.
175        """
176        return cls(
177            address=address,
178            username=username,
179            password=password,
180            invite_code=invite_code,
181            port=port,
182            server=None,
183            verify_server_pubkey=verify_server_pubkey,
184        )

Create a client that connects to a remote server. See also Client.local.

Arguments:
  • address: Server IP address.
  • username: The user's username.
  • password: The user's password.
  • invite_code: An invite code for creating a new user.
  • port: Server port number.
  • verify_server_pubkey: If provided, will compare against the pgnet.Server.pubkey of the server and disconnect if they do not match.
async def async_connect(self):
186    async def async_connect(self):
187        """Connect to a server.
188
189        This procedure will automatically create an end-to-end encrypted connection
190        (optionally verifying the public key first), and authenticate the username and
191        password. Only after succesfully completing these steps will the client be
192        considered connected and ready to process the packet queue from `Client.send`.
193        """
194        logger.debug(f"Connecting... {self}")
195        if self._server is None:
196            return await self._async_connect_remote()
197        else:
198            return await self._async_connect_local()

Connect to a server.

This procedure will automatically create an end-to-end encrypted connection (optionally verifying the public key first), and authenticate the username and password. Only after succesfully completing these steps will the client be considered connected and ready to process the packet queue from Client.send.

def disconnect(self):
200    def disconnect(self):
201        """Close the connection."""
202        self._do_disconnect = True

Close the connection.

def get_game_dir(self, callback: Callable, /):
204    def get_game_dir(self, callback: Callable, /):
205        """Get the games directory from the server and pass the response to callback."""
206        self.send(Packet(Request.GAME_DIR), callback, do_next=True)

Get the games directory from the server and pass the response to callback.

def create_game( self, name: str, /, *, password: Optional[str] = None, callback: Optional[Callable[[pgnet.util.Response], Any]] = None):
208    def create_game(
209        self,
210        name: str,
211        /,
212        *,
213        password: Optional[str] = None,
214        callback: Optional[ResponseCallback] = None,
215    ):
216        """Request from the server to create and join a game."""
217        payload = dict(name=name)
218        if password:
219            payload["password"] = password
220        self.send(Packet(Request.CREATE_GAME, payload), callback, do_next=True)

Request from the server to create and join a game.

def join_game( self, name: str, /, *, password: Optional[str] = None, callback: Optional[Callable[[pgnet.util.Response], Any]] = None):
222    def join_game(
223        self,
224        name: str,
225        /,
226        *,
227        password: Optional[str] = None,
228        callback: Optional[ResponseCallback] = None,
229    ):
230        """Request from the server to join a game."""
231        payload = dict(name=name)
232        if password:
233            payload["password"] = password
234        self.send(Packet(Request.JOIN_GAME, payload), callback, do_next=True)

Request from the server to join a game.

def leave_game( self, *, callback: Optional[Callable[[pgnet.util.Response], Any]] = None):
236    def leave_game(
237        self,
238        *,
239        callback: Optional[ResponseCallback] = None,
240    ):
241        """Request from the server to leave the game."""
242        self.send(Packet(Request.LEAVE_GAME), callback, do_next=True)

Request from the server to leave the game.

def send( self, packet: pgnet.util.Packet, callback: Optional[Callable[[pgnet.util.Response], Any]] = None, /, do_next: bool = False):
244    def send(
245        self,
246        packet: Packet,
247        callback: Optional[ResponseCallback] = None,
248        /,
249        do_next: bool = False,
250    ):
251        """Add a `pgnet.Packet` to the queue.
252
253        If a callback was given, the `pgnet.Response` will be passed to it. Callbacks
254        are not ensured, as the queue can be arbitrarily cleared using
255        `Client.flush_queue`.
256        """
257        if not self._connected:
258            logger.warning(f"Cannot queue packets while disconnected: {packet}")
259            return
260        packet_count = len(self._packet_queue)
261        if packet_count >= 5:
262            logger.warning(f"{packet_count} packets pending, adding: {packet}")
263        packet_callback = (packet, callback)
264        if do_next:
265            self._packet_queue.insert(0, packet_callback)
266        else:
267            self._packet_queue.append(packet_callback)

Add a pgnet.Packet to the queue.

If a callback was given, the pgnet.Response will be passed to it. Callbacks are not ensured, as the queue can be arbitrarily cleared using Client.flush_queue.

def flush_queue(self):
269    def flush_queue(self):
270        """Clear packets and their respective callbacks from the queue.
271
272        See also: `Client.send`.
273        """
274        if self._packet_queue:
275            logger.debug(f"Discarding messages:  {self._packet_queue}")
276        self._packet_queue = []

Clear packets and their respective callbacks from the queue.

See also: Client.send.

def on_connection(self, connected: bool):
278    def on_connection(self, connected: bool):
279        """Called when connected or disconnected. See also: `Client.connected`."""
280        pass

Called when connected or disconnected. See also: Client.connected.

connected: bool

If we are connected to the server. See also: Client.on_connection.

def on_status(self, status: str):
287    def on_status(self, status: str):
288        """Called with feedback on client status. See also: `Client.status`."""
289        pass

Called with feedback on client status. See also: Client.status.

status: str

Last status feedback message. See also: Client.on_status.

def on_game(self, game_name: Optional[str]):
296    def on_game(self, game_name: Optional[str]):
297        """Called when joining or leaving a game. See also: `Client.game`."""
298        pass

Called when joining or leaving a game. See also: Client.game.

game: Optional[str]

Currently joined game name. See also: Client.on_game.

def on_heartbeat(self, heartbeat: pgnet.util.Response):
305    def on_heartbeat(self, heartbeat: Response):
306        """Override this method to implement heartbeat updates.
307
308        The *heartbeat* Response is given by `pgnet.util.Game.handle_heartbeat`. The
309        heartbeat rate is set by `pgnet.util.Game.heartbeat_rate`.
310
311        See also: `Client.heartbeat_payload`.
312        """
313        pass

Override this method to implement heartbeat updates.

The heartbeat Response is given by pgnet.util.Game.handle_heartbeat. The heartbeat rate is set by pgnet.util.Game.heartbeat_rate.

See also: Client.heartbeat_payload.

def heartbeat_payload(self) -> dict:
315    def heartbeat_payload(self) -> dict:
316        """Override this method to add data to the heartbeat request payload.
317
318        This payload is passed to `pgnet.util.Game.handle_heartbeat`. The heartbeat rate
319        is set by `pgnet.util.Game.heartbeat_rate`.
320
321        See also: `Client.on_heartbeat`.
322        """
323        return dict()

Override this method to add data to the heartbeat request payload.

This payload is passed to pgnet.util.Game.handle_heartbeat. The heartbeat rate is set by pgnet.util.Game.heartbeat_rate.

See also: Client.on_heartbeat.

@dataclass
class ClientConnection(pgnet.util.Connection):
28@dataclass
29class ClientConnection(Connection):
30    """Thin wrapper for `pgnet.util.Connection`.
31
32    Provides `ClientConnection.send_recv` for combined sending and receiving.
33    """
34
35    game: Optional[str] = None
36    _websocket_busy: bool = False
37
38    async def send_recv(self, packet: Packet) -> Response:
39        """Send packet and receive response via Baseclass `send` and `recv`."""
40        if self._websocket_busy:
41            logger.warning(f"Packet is waiting for busy websocket... {packet}")
42        while self._websocket_busy and self.websocket.open:
43            await asyncio.sleep(0.1)
44        self._websocket_busy = True
45        try:
46            await self.send(packet.serialize())
47            response: str = await self.recv()
48            return Response.deserialize(response)
49        finally:
50            self._websocket_busy = False

Thin wrapper for pgnet.util.Connection.

Provides ClientConnection.send_recv for combined sending and receiving.

ClientConnection( websocket: websockets.legacy.protocol.WebSocketCommonProtocol, tunnel: Optional[pgnet.util.Tunnel] = None, remote: str = '', game: Optional[str] = None, _websocket_busy: bool = False)
async def send_recv(self, packet: pgnet.util.Packet) -> pgnet.util.Response:
38    async def send_recv(self, packet: Packet) -> Response:
39        """Send packet and receive response via Baseclass `send` and `recv`."""
40        if self._websocket_busy:
41            logger.warning(f"Packet is waiting for busy websocket... {packet}")
42        while self._websocket_busy and self.websocket.open:
43            await asyncio.sleep(0.1)
44        self._websocket_busy = True
45        try:
46            await self.send(packet.serialize())
47            response: str = await self.recv()
48            return Response.deserialize(response)
49        finally:
50            self._websocket_busy = False

Send packet and receive response via Baseclass send and recv.