pgnet.client
Home of the Client
class.
1"""Home of the `Client` class.""" 2 3from typing import Optional, Callable, Any, Type 4from loguru import logger 5import asyncio 6from dataclasses import dataclass 7import websockets 8from websockets.client import WebSocketClientProtocol 9from .server import Server 10from .util import ( 11 Packet, 12 Response, 13 Connection, 14 Key, 15 Game, 16 DEFAULT_PORT, 17 DisconnectedError, 18 Status, 19 Request, 20) 21 22 23DEFAULT_HEARTBEAT_RATE = 10 24ResponseCallback = Callable[[Response], Any] 25 26 27@dataclass 28class ClientConnection(Connection): 29 """Thin wrapper for `pgnet.util.Connection`. 30 31 Provides `ClientConnection.send_recv` for combined sending and receiving. 32 """ 33 34 game: Optional[str] = None 35 _websocket_busy: bool = False 36 37 async def send_recv(self, packet: Packet) -> Response: 38 """Send packet and receive response via Baseclass `send` and `recv`.""" 39 if self._websocket_busy: 40 logger.warning(f"Packet is waiting for busy websocket... {packet}") 41 while self._websocket_busy and self.websocket.open: 42 await asyncio.sleep(0.1) 43 self._websocket_busy = True 44 try: 45 await self.send(packet.serialize()) 46 response: str = await self.recv() 47 return Response.deserialize(response) 48 finally: 49 self._websocket_busy = False 50 51 52class Client: 53 """The client that manages communication with the server. 54 55 ## Initializing 56 This class should not be initialized directly, instead use `Client.remote` or 57 `Client.local`. 58 ```python3 59 # This client will connect to a remote server 60 remote_client = Client.remote(address="1.2.3.4", username="player") 61 # This client will create a local server and connect to it 62 local_client = Client.local(game=MyGame, username="player") 63 ``` 64 65 ## Connecting and starting a game 66 Use `Client.async_connect` to connect to the server. Once connected, use 67 `Client.get_game_dir`, `Client.create_game`, `Client.join_game`, and 68 `Client.leave_game` to join or leave a game. 69 70 ## Using the packet queue 71 When in game (`Client.game` is not *None*), use `Client.send` to send a 72 `pgnet.Packet` to `pgnet.Game.handle_game_packet` and receive a `pgnet.Response`. 73 74 ## Client events 75 It is possible to bind callbacks to client events by subclassing and overriding or 76 by setting `Client.on_connection`, `Client.on_status`, and `Client.on_game`. 77 78 """ 79 80 def __init__( 81 self, 82 *, 83 address: str, 84 username: str, 85 password: str, 86 invite_code: str, 87 port: int, 88 server: Optional[Server], 89 verify_server_pubkey: str, 90 ): 91 """This class should not be initialized directly. 92 93 .. note:: Use `Client.local` or `Client.remote` to create a client. 94 """ 95 self._key: Key = Key() 96 self._status: str = "New client." 97 self._server_connection: Optional[WebSocketClientProtocol] = None 98 self._connected: bool = False 99 self._do_disconnect: bool = False 100 self._packet_queue: list[tuple[Packet, ResponseCallback]] = [] 101 self._game: Optional[str] = None 102 self._heartbeat_interval = 1 / DEFAULT_HEARTBEAT_RATE 103 # Connection details 104 self._username = username 105 self._password = password 106 self._invite_code = invite_code 107 self._address = address 108 self._port = port 109 self._server = server 110 self._verify_server_pubkey = verify_server_pubkey 111 112 @classmethod 113 def local( 114 cls, 115 *, 116 game: Type[Game], 117 username: str, 118 password: str = "", 119 port: int = DEFAULT_PORT, 120 server_factory: Optional[Callable[[Any], Server]] = None, 121 ) -> "Client": 122 """Create a client that uses its own local server. See also `Client.remote`. 123 124 Only the *game* and *username* arguments are required. 125 126 Args: 127 game: `pgnet.Game` class to pass to the local server. 128 username: The user's username. 129 password: The user's password. 130 port: Server port number. 131 server_factory: If provided, will be used to create the local server. Must 132 accept the same arguments as `pgnet.Server` (default). This is useful 133 for using a server subclass or to pass custom arguments to the local 134 server. 135 """ 136 server_factory = server_factory or Server 137 server = server_factory( 138 game, 139 listen_globally=False, 140 registration_enabled=True, 141 require_user_password=False, 142 ) 143 return cls( 144 address="localhost", 145 username=username, 146 password=password, 147 invite_code="", 148 port=port, 149 server=server, 150 verify_server_pubkey=server.pubkey, 151 ) 152 153 @classmethod 154 def remote( 155 cls, 156 *, 157 address: str, 158 username: str, 159 password: str = "", 160 invite_code: str = "", 161 port: int = DEFAULT_PORT, 162 verify_server_pubkey: str = "", 163 ) -> "Client": 164 """Create a client that connects to a remote server. See also `Client.local`. 165 166 Args: 167 address: Server IP address. 168 username: The user's username. 169 password: The user's password. 170 invite_code: An invite code for creating a new user. 171 port: Server port number. 172 verify_server_pubkey: If provided, will compare against the 173 `pgnet.Server.pubkey` of the server and disconnect if they do not match. 174 """ 175 return cls( 176 address=address, 177 username=username, 178 password=password, 179 invite_code=invite_code, 180 port=port, 181 server=None, 182 verify_server_pubkey=verify_server_pubkey, 183 ) 184 185 async def async_connect(self): 186 """Connect to a server. 187 188 This procedure will automatically create an end-to-end encrypted connection 189 (optionally verifying the public key first), and authenticate the username and 190 password. Only after succesfully completing these steps will the client be 191 considered connected and ready to process the packet queue from `Client.send`. 192 """ 193 logger.debug(f"Connecting... {self}") 194 if self._server is None: 195 return await self._async_connect_remote() 196 else: 197 return await self._async_connect_local() 198 199 def disconnect(self): 200 """Close the connection.""" 201 self._do_disconnect = True 202 203 def get_game_dir(self, callback: Callable, /): 204 """Get the games directory from the server and pass the response to callback.""" 205 self.send(Packet(Request.GAME_DIR), callback, do_next=True) 206 207 def create_game( 208 self, 209 name: str, 210 /, 211 *, 212 password: Optional[str] = None, 213 callback: Optional[ResponseCallback] = None, 214 ): 215 """Request from the server to create and join a game.""" 216 payload = dict(name=name) 217 if password: 218 payload["password"] = password 219 self.send(Packet(Request.CREATE_GAME, payload), callback, do_next=True) 220 221 def join_game( 222 self, 223 name: str, 224 /, 225 *, 226 password: Optional[str] = None, 227 callback: Optional[ResponseCallback] = None, 228 ): 229 """Request from the server to join a game.""" 230 payload = dict(name=name) 231 if password: 232 payload["password"] = password 233 self.send(Packet(Request.JOIN_GAME, payload), callback, do_next=True) 234 235 def leave_game( 236 self, 237 *, 238 callback: Optional[ResponseCallback] = None, 239 ): 240 """Request from the server to leave the game.""" 241 self.send(Packet(Request.LEAVE_GAME), callback, do_next=True) 242 243 def send( 244 self, 245 packet: Packet, 246 callback: Optional[ResponseCallback] = None, 247 /, 248 do_next: bool = False, 249 ): 250 """Add a `pgnet.Packet` to the queue. 251 252 If a callback was given, the `pgnet.Response` will be passed to it. Callbacks 253 are not ensured, as the queue can be arbitrarily cleared using 254 `Client.flush_queue`. 255 """ 256 if not self._connected: 257 logger.warning(f"Cannot queue packets while disconnected: {packet}") 258 return 259 packet_count = len(self._packet_queue) 260 if packet_count >= 5: 261 logger.warning(f"{packet_count} packets pending, adding: {packet}") 262 packet_callback = (packet, callback) 263 if do_next: 264 self._packet_queue.insert(0, packet_callback) 265 else: 266 self._packet_queue.append(packet_callback) 267 268 def flush_queue(self): 269 """Clear packets and their respective callbacks from the queue. 270 271 See also: `Client.send`. 272 """ 273 if self._packet_queue: 274 logger.debug(f"Discarding messages: {self._packet_queue}") 275 self._packet_queue = [] 276 277 def on_connection(self, connected: bool): 278 """Called when connected or disconnected. See also: `Client.connected`.""" 279 pass 280 281 @property 282 def connected(self) -> bool: 283 """If we are connected to the server. See also: `Client.on_connection`.""" 284 return self._connected 285 286 def on_status(self, status: str): 287 """Called with feedback on client status. See also: `Client.status`.""" 288 pass 289 290 @property 291 def status(self) -> str: 292 """Last status feedback message. See also: `Client.on_status`.""" 293 return self._status 294 295 def on_game(self, game_name: Optional[str]): 296 """Called when joining or leaving a game. See also: `Client.game`.""" 297 pass 298 299 @property 300 def game(self) -> Optional[str]: 301 """Currently joined game name. See also: `Client.on_game`.""" 302 return self._game 303 304 def on_heartbeat(self, heartbeat: Response): 305 """Override this method to implement heartbeat updates. 306 307 The *heartbeat* Response is given by `pgnet.util.Game.handle_heartbeat`. The 308 heartbeat rate is set by `pgnet.util.Game.heartbeat_rate`. 309 310 See also: `Client.heartbeat_payload`. 311 """ 312 pass 313 314 def heartbeat_payload(self) -> dict: 315 """Override this method to add data to the heartbeat request payload. 316 317 This payload is passed to `pgnet.util.Game.handle_heartbeat`. The heartbeat rate 318 is set by `pgnet.util.Game.heartbeat_rate`. 319 320 See also: `Client.on_heartbeat`. 321 """ 322 return dict() 323 324 async def _async_connect_remote(self): 325 """Connect to the server. 326 327 Gets a websocket to create a `ClientConnection` object. This is passed to the 328 handshake handler to be populated, and then to the user connection for handling 329 the packet queue. The heartbeat task is managed here. 330 """ 331 if self._server_connection is not None: 332 raise RuntimeError("Cannot open more than one connection per client.") 333 logger.debug("Connecting to server...") 334 full_address = f"{self._address}:{self._port}" 335 self._server_connection = websockets.connect( 336 f"ws://{full_address}", 337 close_timeout=1, 338 ) 339 self._set_status(f"Connecting to {full_address}...", logger.info) 340 connection: Optional[ClientConnection] = None 341 heartbeat: Optional[asyncio.Task] = None 342 try: 343 try: 344 websocket = await self._server_connection 345 except OSError as e: 346 logger.debug(f"{e=}") 347 raise DisconnectedError("Failed to call server.") 348 connection = ClientConnection(websocket) 349 self._set_status("Logging in...", logger.info) 350 await self._handle_handshake(connection) 351 self._set_connection(True) 352 self._set_status( 353 f"Logged in as {self._username} @ {connection.remote}", 354 logger.info, 355 ) 356 heartbeat = asyncio.create_task(self._async_heartbeat()) 357 await self._handle_user_connection(connection) 358 except DisconnectedError as e: 359 logger.debug(f"{e=}") 360 if connection: 361 await connection.close() 362 if heartbeat: 363 heartbeat.cancel() 364 self._set_connection(False) 365 self._server_connection = None 366 self._set_status(f"Disconnected: {e.args[0]}") 367 logger.info(f"Connection terminated. {self}") 368 return 369 logger.warning(f"Connection terminated without DisconnectedError {connection}") 370 371 async def _async_connect_local(self): 372 """Wraps `Client._async_connect_remote to cleanup/teardown the local server.""" 373 logger.debug("Setting up local server...") 374 assert isinstance(self._server, Server) 375 started = asyncio.Future() 376 server_coro = self._server.async_run(on_start=lambda *a: started.set_result(1)) 377 server_task = asyncio.create_task(server_coro) 378 server_task.add_done_callback(lambda *a: self.disconnect()) 379 await asyncio.wait((server_task, started), return_when=asyncio.FIRST_COMPLETED) 380 await self._async_connect_remote() 381 self.disconnect() 382 self._server.shutdown() 383 384 async def _handle_handshake(self, connection: ClientConnection): 385 """Handle a new connection's handshake sequence. Modifies the connection object. 386 387 First trade public keys and assign the connection's `tunnel`. Then log in 388 with our username. 389 """ 390 # Trade public keys 391 packet = Packet("key_trade", dict(pubkey=self._key.pubkey)) 392 response = await connection.send_recv(packet) 393 pubkey = response.payload.get("pubkey") 394 if not pubkey or not isinstance(pubkey, str): 395 raise DisconnectedError("Missing public key string from server.") 396 if self._verify_server_pubkey: 397 if pubkey != self._verify_server_pubkey: 398 raise DisconnectedError("Unverified server public key.") 399 logger.debug(f"Server pubkey verified: {pubkey=}") 400 connection.tunnel = self._key.get_tunnel(pubkey) 401 logger.debug(f"Assigned tunnel: {connection}") 402 # Authenticate 403 handshake_payload = dict( 404 username=self._username, 405 password=self._password, 406 invite_code=self._invite_code, 407 ) 408 packet = Packet("handshake", handshake_payload) 409 response = await connection.send_recv(packet) 410 if response.status != Status.OK: 411 m = response.message 412 logger.info(m) 413 raise DisconnectedError(m) 414 415 async def _async_heartbeat(self): 416 """Periodically update while connected and in game. 417 418 Will create a heartbeat request using `Client.heartbeat_payload` and pass the 419 response to `Client.on_heartbeat`. 420 """ 421 while True: 422 await asyncio.sleep(self._heartbeat_interval) 423 if self.connected and self.game: 424 packet = Packet(Request.HEARTBEAT_UPDATE, self.heartbeat_payload()) 425 self.send(packet, self.on_heartbeat) 426 427 async def _handle_user_connection(self, connection: ClientConnection): 428 """Handle the connection after handshake - process the packet queue.""" 429 self._do_disconnect = False 430 while not self._do_disconnect: 431 # Wait for queued packets 432 if not self._packet_queue: 433 await asyncio.sleep(0.01) 434 continue 435 # Send and callback with response 436 packet, callback = self._packet_queue.pop(0) 437 response = await connection.send_recv(packet) 438 if response.status != Status.OK: 439 logger.info(f"Status code: {response.debug_repr}") 440 self._handle_game_change(response) 441 if callback: 442 callback(response) 443 # Disconnect if alerted 444 if response.disconnecting: 445 logger.info(f"Disconnected.\n{packet}\n{response.debug_repr}") 446 raise DisconnectedError(response.message) 447 raise DisconnectedError("Client closed connection.") 448 449 def _set_status(self, status: str, logger: Optional[Callable] = None): 450 """Set the client status message with associated callback.""" 451 self._status = status 452 if logger: 453 logger(status) 454 if self.on_status: 455 self.on_status(status) 456 457 def _set_connection(self, set_as: bool, /): 458 """Set the client connection status with associated callback.""" 459 if self._connected == set_as: 460 return 461 logger.debug(f"Setting connection as: {set_as}") 462 self._connected = set_as 463 if self.on_connection: 464 self.on_connection(set_as) 465 466 def _handle_game_change(self, response: Response): 467 """Handle game change. 468 469 Set the client game name and heartbeat rate, then call event callback. 470 """ 471 game_name = response.game 472 if game_name != self._game: 473 if game_name is None: 474 self._set_status(f"Left game: {self._game}") 475 else: 476 self._set_status(f"Joined game: {game_name}") 477 hb_rate = response.payload.get("heartbeat_rate", DEFAULT_HEARTBEAT_RATE) 478 self._heartbeat_interval = 1 / hb_rate 479 logger.debug(f"{self._heartbeat_interval=}") 480 self._game = game_name 481 if self.on_game: 482 self.on_game(game_name) 483 484 def __repr__(self): 485 """Object repr.""" 486 local = "(local) " if self._server else "" 487 conn = "connected" if self.connected else "disconnected" 488 return ( 489 f"<{self.__class__.__qualname__} {local}{self._username!r}" 490 f" {conn}, {self._address}:{self._port}" 491 f" @ {id(self):x}>" 492 ) 493 494 495__all__ = ( 496 "Client", 497 "ClientConnection", 498)
53class Client: 54 """The client that manages communication with the server. 55 56 ## Initializing 57 This class should not be initialized directly, instead use `Client.remote` or 58 `Client.local`. 59 ```python3 60 # This client will connect to a remote server 61 remote_client = Client.remote(address="1.2.3.4", username="player") 62 # This client will create a local server and connect to it 63 local_client = Client.local(game=MyGame, username="player") 64 ``` 65 66 ## Connecting and starting a game 67 Use `Client.async_connect` to connect to the server. Once connected, use 68 `Client.get_game_dir`, `Client.create_game`, `Client.join_game`, and 69 `Client.leave_game` to join or leave a game. 70 71 ## Using the packet queue 72 When in game (`Client.game` is not *None*), use `Client.send` to send a 73 `pgnet.Packet` to `pgnet.Game.handle_game_packet` and receive a `pgnet.Response`. 74 75 ## Client events 76 It is possible to bind callbacks to client events by subclassing and overriding or 77 by setting `Client.on_connection`, `Client.on_status`, and `Client.on_game`. 78 79 """ 80 81 def __init__( 82 self, 83 *, 84 address: str, 85 username: str, 86 password: str, 87 invite_code: str, 88 port: int, 89 server: Optional[Server], 90 verify_server_pubkey: str, 91 ): 92 """This class should not be initialized directly. 93 94 .. note:: Use `Client.local` or `Client.remote` to create a client. 95 """ 96 self._key: Key = Key() 97 self._status: str = "New client." 98 self._server_connection: Optional[WebSocketClientProtocol] = None 99 self._connected: bool = False 100 self._do_disconnect: bool = False 101 self._packet_queue: list[tuple[Packet, ResponseCallback]] = [] 102 self._game: Optional[str] = None 103 self._heartbeat_interval = 1 / DEFAULT_HEARTBEAT_RATE 104 # Connection details 105 self._username = username 106 self._password = password 107 self._invite_code = invite_code 108 self._address = address 109 self._port = port 110 self._server = server 111 self._verify_server_pubkey = verify_server_pubkey 112 113 @classmethod 114 def local( 115 cls, 116 *, 117 game: Type[Game], 118 username: str, 119 password: str = "", 120 port: int = DEFAULT_PORT, 121 server_factory: Optional[Callable[[Any], Server]] = None, 122 ) -> "Client": 123 """Create a client that uses its own local server. See also `Client.remote`. 124 125 Only the *game* and *username* arguments are required. 126 127 Args: 128 game: `pgnet.Game` class to pass to the local server. 129 username: The user's username. 130 password: The user's password. 131 port: Server port number. 132 server_factory: If provided, will be used to create the local server. Must 133 accept the same arguments as `pgnet.Server` (default). This is useful 134 for using a server subclass or to pass custom arguments to the local 135 server. 136 """ 137 server_factory = server_factory or Server 138 server = server_factory( 139 game, 140 listen_globally=False, 141 registration_enabled=True, 142 require_user_password=False, 143 ) 144 return cls( 145 address="localhost", 146 username=username, 147 password=password, 148 invite_code="", 149 port=port, 150 server=server, 151 verify_server_pubkey=server.pubkey, 152 ) 153 154 @classmethod 155 def remote( 156 cls, 157 *, 158 address: str, 159 username: str, 160 password: str = "", 161 invite_code: str = "", 162 port: int = DEFAULT_PORT, 163 verify_server_pubkey: str = "", 164 ) -> "Client": 165 """Create a client that connects to a remote server. See also `Client.local`. 166 167 Args: 168 address: Server IP address. 169 username: The user's username. 170 password: The user's password. 171 invite_code: An invite code for creating a new user. 172 port: Server port number. 173 verify_server_pubkey: If provided, will compare against the 174 `pgnet.Server.pubkey` of the server and disconnect if they do not match. 175 """ 176 return cls( 177 address=address, 178 username=username, 179 password=password, 180 invite_code=invite_code, 181 port=port, 182 server=None, 183 verify_server_pubkey=verify_server_pubkey, 184 ) 185 186 async def async_connect(self): 187 """Connect to a server. 188 189 This procedure will automatically create an end-to-end encrypted connection 190 (optionally verifying the public key first), and authenticate the username and 191 password. Only after succesfully completing these steps will the client be 192 considered connected and ready to process the packet queue from `Client.send`. 193 """ 194 logger.debug(f"Connecting... {self}") 195 if self._server is None: 196 return await self._async_connect_remote() 197 else: 198 return await self._async_connect_local() 199 200 def disconnect(self): 201 """Close the connection.""" 202 self._do_disconnect = True 203 204 def get_game_dir(self, callback: Callable, /): 205 """Get the games directory from the server and pass the response to callback.""" 206 self.send(Packet(Request.GAME_DIR), callback, do_next=True) 207 208 def create_game( 209 self, 210 name: str, 211 /, 212 *, 213 password: Optional[str] = None, 214 callback: Optional[ResponseCallback] = None, 215 ): 216 """Request from the server to create and join a game.""" 217 payload = dict(name=name) 218 if password: 219 payload["password"] = password 220 self.send(Packet(Request.CREATE_GAME, payload), callback, do_next=True) 221 222 def join_game( 223 self, 224 name: str, 225 /, 226 *, 227 password: Optional[str] = None, 228 callback: Optional[ResponseCallback] = None, 229 ): 230 """Request from the server to join a game.""" 231 payload = dict(name=name) 232 if password: 233 payload["password"] = password 234 self.send(Packet(Request.JOIN_GAME, payload), callback, do_next=True) 235 236 def leave_game( 237 self, 238 *, 239 callback: Optional[ResponseCallback] = None, 240 ): 241 """Request from the server to leave the game.""" 242 self.send(Packet(Request.LEAVE_GAME), callback, do_next=True) 243 244 def send( 245 self, 246 packet: Packet, 247 callback: Optional[ResponseCallback] = None, 248 /, 249 do_next: bool = False, 250 ): 251 """Add a `pgnet.Packet` to the queue. 252 253 If a callback was given, the `pgnet.Response` will be passed to it. Callbacks 254 are not ensured, as the queue can be arbitrarily cleared using 255 `Client.flush_queue`. 256 """ 257 if not self._connected: 258 logger.warning(f"Cannot queue packets while disconnected: {packet}") 259 return 260 packet_count = len(self._packet_queue) 261 if packet_count >= 5: 262 logger.warning(f"{packet_count} packets pending, adding: {packet}") 263 packet_callback = (packet, callback) 264 if do_next: 265 self._packet_queue.insert(0, packet_callback) 266 else: 267 self._packet_queue.append(packet_callback) 268 269 def flush_queue(self): 270 """Clear packets and their respective callbacks from the queue. 271 272 See also: `Client.send`. 273 """ 274 if self._packet_queue: 275 logger.debug(f"Discarding messages: {self._packet_queue}") 276 self._packet_queue = [] 277 278 def on_connection(self, connected: bool): 279 """Called when connected or disconnected. See also: `Client.connected`.""" 280 pass 281 282 @property 283 def connected(self) -> bool: 284 """If we are connected to the server. See also: `Client.on_connection`.""" 285 return self._connected 286 287 def on_status(self, status: str): 288 """Called with feedback on client status. See also: `Client.status`.""" 289 pass 290 291 @property 292 def status(self) -> str: 293 """Last status feedback message. See also: `Client.on_status`.""" 294 return self._status 295 296 def on_game(self, game_name: Optional[str]): 297 """Called when joining or leaving a game. See also: `Client.game`.""" 298 pass 299 300 @property 301 def game(self) -> Optional[str]: 302 """Currently joined game name. See also: `Client.on_game`.""" 303 return self._game 304 305 def on_heartbeat(self, heartbeat: Response): 306 """Override this method to implement heartbeat updates. 307 308 The *heartbeat* Response is given by `pgnet.util.Game.handle_heartbeat`. The 309 heartbeat rate is set by `pgnet.util.Game.heartbeat_rate`. 310 311 See also: `Client.heartbeat_payload`. 312 """ 313 pass 314 315 def heartbeat_payload(self) -> dict: 316 """Override this method to add data to the heartbeat request payload. 317 318 This payload is passed to `pgnet.util.Game.handle_heartbeat`. The heartbeat rate 319 is set by `pgnet.util.Game.heartbeat_rate`. 320 321 See also: `Client.on_heartbeat`. 322 """ 323 return dict() 324 325 async def _async_connect_remote(self): 326 """Connect to the server. 327 328 Gets a websocket to create a `ClientConnection` object. This is passed to the 329 handshake handler to be populated, and then to the user connection for handling 330 the packet queue. The heartbeat task is managed here. 331 """ 332 if self._server_connection is not None: 333 raise RuntimeError("Cannot open more than one connection per client.") 334 logger.debug("Connecting to server...") 335 full_address = f"{self._address}:{self._port}" 336 self._server_connection = websockets.connect( 337 f"ws://{full_address}", 338 close_timeout=1, 339 ) 340 self._set_status(f"Connecting to {full_address}...", logger.info) 341 connection: Optional[ClientConnection] = None 342 heartbeat: Optional[asyncio.Task] = None 343 try: 344 try: 345 websocket = await self._server_connection 346 except OSError as e: 347 logger.debug(f"{e=}") 348 raise DisconnectedError("Failed to call server.") 349 connection = ClientConnection(websocket) 350 self._set_status("Logging in...", logger.info) 351 await self._handle_handshake(connection) 352 self._set_connection(True) 353 self._set_status( 354 f"Logged in as {self._username} @ {connection.remote}", 355 logger.info, 356 ) 357 heartbeat = asyncio.create_task(self._async_heartbeat()) 358 await self._handle_user_connection(connection) 359 except DisconnectedError as e: 360 logger.debug(f"{e=}") 361 if connection: 362 await connection.close() 363 if heartbeat: 364 heartbeat.cancel() 365 self._set_connection(False) 366 self._server_connection = None 367 self._set_status(f"Disconnected: {e.args[0]}") 368 logger.info(f"Connection terminated. {self}") 369 return 370 logger.warning(f"Connection terminated without DisconnectedError {connection}") 371 372 async def _async_connect_local(self): 373 """Wraps `Client._async_connect_remote to cleanup/teardown the local server.""" 374 logger.debug("Setting up local server...") 375 assert isinstance(self._server, Server) 376 started = asyncio.Future() 377 server_coro = self._server.async_run(on_start=lambda *a: started.set_result(1)) 378 server_task = asyncio.create_task(server_coro) 379 server_task.add_done_callback(lambda *a: self.disconnect()) 380 await asyncio.wait((server_task, started), return_when=asyncio.FIRST_COMPLETED) 381 await self._async_connect_remote() 382 self.disconnect() 383 self._server.shutdown() 384 385 async def _handle_handshake(self, connection: ClientConnection): 386 """Handle a new connection's handshake sequence. Modifies the connection object. 387 388 First trade public keys and assign the connection's `tunnel`. Then log in 389 with our username. 390 """ 391 # Trade public keys 392 packet = Packet("key_trade", dict(pubkey=self._key.pubkey)) 393 response = await connection.send_recv(packet) 394 pubkey = response.payload.get("pubkey") 395 if not pubkey or not isinstance(pubkey, str): 396 raise DisconnectedError("Missing public key string from server.") 397 if self._verify_server_pubkey: 398 if pubkey != self._verify_server_pubkey: 399 raise DisconnectedError("Unverified server public key.") 400 logger.debug(f"Server pubkey verified: {pubkey=}") 401 connection.tunnel = self._key.get_tunnel(pubkey) 402 logger.debug(f"Assigned tunnel: {connection}") 403 # Authenticate 404 handshake_payload = dict( 405 username=self._username, 406 password=self._password, 407 invite_code=self._invite_code, 408 ) 409 packet = Packet("handshake", handshake_payload) 410 response = await connection.send_recv(packet) 411 if response.status != Status.OK: 412 m = response.message 413 logger.info(m) 414 raise DisconnectedError(m) 415 416 async def _async_heartbeat(self): 417 """Periodically update while connected and in game. 418 419 Will create a heartbeat request using `Client.heartbeat_payload` and pass the 420 response to `Client.on_heartbeat`. 421 """ 422 while True: 423 await asyncio.sleep(self._heartbeat_interval) 424 if self.connected and self.game: 425 packet = Packet(Request.HEARTBEAT_UPDATE, self.heartbeat_payload()) 426 self.send(packet, self.on_heartbeat) 427 428 async def _handle_user_connection(self, connection: ClientConnection): 429 """Handle the connection after handshake - process the packet queue.""" 430 self._do_disconnect = False 431 while not self._do_disconnect: 432 # Wait for queued packets 433 if not self._packet_queue: 434 await asyncio.sleep(0.01) 435 continue 436 # Send and callback with response 437 packet, callback = self._packet_queue.pop(0) 438 response = await connection.send_recv(packet) 439 if response.status != Status.OK: 440 logger.info(f"Status code: {response.debug_repr}") 441 self._handle_game_change(response) 442 if callback: 443 callback(response) 444 # Disconnect if alerted 445 if response.disconnecting: 446 logger.info(f"Disconnected.\n{packet}\n{response.debug_repr}") 447 raise DisconnectedError(response.message) 448 raise DisconnectedError("Client closed connection.") 449 450 def _set_status(self, status: str, logger: Optional[Callable] = None): 451 """Set the client status message with associated callback.""" 452 self._status = status 453 if logger: 454 logger(status) 455 if self.on_status: 456 self.on_status(status) 457 458 def _set_connection(self, set_as: bool, /): 459 """Set the client connection status with associated callback.""" 460 if self._connected == set_as: 461 return 462 logger.debug(f"Setting connection as: {set_as}") 463 self._connected = set_as 464 if self.on_connection: 465 self.on_connection(set_as) 466 467 def _handle_game_change(self, response: Response): 468 """Handle game change. 469 470 Set the client game name and heartbeat rate, then call event callback. 471 """ 472 game_name = response.game 473 if game_name != self._game: 474 if game_name is None: 475 self._set_status(f"Left game: {self._game}") 476 else: 477 self._set_status(f"Joined game: {game_name}") 478 hb_rate = response.payload.get("heartbeat_rate", DEFAULT_HEARTBEAT_RATE) 479 self._heartbeat_interval = 1 / hb_rate 480 logger.debug(f"{self._heartbeat_interval=}") 481 self._game = game_name 482 if self.on_game: 483 self.on_game(game_name) 484 485 def __repr__(self): 486 """Object repr.""" 487 local = "(local) " if self._server else "" 488 conn = "connected" if self.connected else "disconnected" 489 return ( 490 f"<{self.__class__.__qualname__} {local}{self._username!r}" 491 f" {conn}, {self._address}:{self._port}" 492 f" @ {id(self):x}>" 493 )
The client that manages communication with the server.
Initializing
This class should not be initialized directly, instead use Client.remote
or
Client.local
.
# This client will connect to a remote server
remote_client = Client.remote(address="1.2.3.4", username="player")
# This client will create a local server and connect to it
local_client = Client.local(game=MyGame, username="player")
Connecting and starting a game
Use Client.async_connect
to connect to the server. Once connected, use
Client.get_game_dir
, Client.create_game
, Client.join_game
, and
Client.leave_game
to join or leave a game.
Using the packet queue
When in game (Client.game
is not None), use Client.send
to send a
pgnet.Packet
to pgnet.Game.handle_game_packet
and receive a pgnet.Response
.
Client events
It is possible to bind callbacks to client events by subclassing and overriding or
by setting Client.on_connection
, Client.on_status
, and Client.on_game
.
81 def __init__( 82 self, 83 *, 84 address: str, 85 username: str, 86 password: str, 87 invite_code: str, 88 port: int, 89 server: Optional[Server], 90 verify_server_pubkey: str, 91 ): 92 """This class should not be initialized directly. 93 94 .. note:: Use `Client.local` or `Client.remote` to create a client. 95 """ 96 self._key: Key = Key() 97 self._status: str = "New client." 98 self._server_connection: Optional[WebSocketClientProtocol] = None 99 self._connected: bool = False 100 self._do_disconnect: bool = False 101 self._packet_queue: list[tuple[Packet, ResponseCallback]] = [] 102 self._game: Optional[str] = None 103 self._heartbeat_interval = 1 / DEFAULT_HEARTBEAT_RATE 104 # Connection details 105 self._username = username 106 self._password = password 107 self._invite_code = invite_code 108 self._address = address 109 self._port = port 110 self._server = server 111 self._verify_server_pubkey = verify_server_pubkey
This class should not be initialized directly.
Use Client.local
or Client.remote
to create a client.
113 @classmethod 114 def local( 115 cls, 116 *, 117 game: Type[Game], 118 username: str, 119 password: str = "", 120 port: int = DEFAULT_PORT, 121 server_factory: Optional[Callable[[Any], Server]] = None, 122 ) -> "Client": 123 """Create a client that uses its own local server. See also `Client.remote`. 124 125 Only the *game* and *username* arguments are required. 126 127 Args: 128 game: `pgnet.Game` class to pass to the local server. 129 username: The user's username. 130 password: The user's password. 131 port: Server port number. 132 server_factory: If provided, will be used to create the local server. Must 133 accept the same arguments as `pgnet.Server` (default). This is useful 134 for using a server subclass or to pass custom arguments to the local 135 server. 136 """ 137 server_factory = server_factory or Server 138 server = server_factory( 139 game, 140 listen_globally=False, 141 registration_enabled=True, 142 require_user_password=False, 143 ) 144 return cls( 145 address="localhost", 146 username=username, 147 password=password, 148 invite_code="", 149 port=port, 150 server=server, 151 verify_server_pubkey=server.pubkey, 152 )
Create a client that uses its own local server. See also Client.remote
.
Only the game and username arguments are required.
Arguments:
- game:
pgnet.Game
class to pass to the local server. - username: The user's username.
- password: The user's password.
- port: Server port number.
- server_factory: If provided, will be used to create the local server. Must
accept the same arguments as
pgnet.Server
(default). This is useful for using a server subclass or to pass custom arguments to the local server.
154 @classmethod 155 def remote( 156 cls, 157 *, 158 address: str, 159 username: str, 160 password: str = "", 161 invite_code: str = "", 162 port: int = DEFAULT_PORT, 163 verify_server_pubkey: str = "", 164 ) -> "Client": 165 """Create a client that connects to a remote server. See also `Client.local`. 166 167 Args: 168 address: Server IP address. 169 username: The user's username. 170 password: The user's password. 171 invite_code: An invite code for creating a new user. 172 port: Server port number. 173 verify_server_pubkey: If provided, will compare against the 174 `pgnet.Server.pubkey` of the server and disconnect if they do not match. 175 """ 176 return cls( 177 address=address, 178 username=username, 179 password=password, 180 invite_code=invite_code, 181 port=port, 182 server=None, 183 verify_server_pubkey=verify_server_pubkey, 184 )
Create a client that connects to a remote server. See also Client.local
.
Arguments:
- address: Server IP address.
- username: The user's username.
- password: The user's password.
- invite_code: An invite code for creating a new user.
- port: Server port number.
- verify_server_pubkey: If provided, will compare against the
pgnet.Server.pubkey
of the server and disconnect if they do not match.
186 async def async_connect(self): 187 """Connect to a server. 188 189 This procedure will automatically create an end-to-end encrypted connection 190 (optionally verifying the public key first), and authenticate the username and 191 password. Only after succesfully completing these steps will the client be 192 considered connected and ready to process the packet queue from `Client.send`. 193 """ 194 logger.debug(f"Connecting... {self}") 195 if self._server is None: 196 return await self._async_connect_remote() 197 else: 198 return await self._async_connect_local()
Connect to a server.
This procedure will automatically create an end-to-end encrypted connection
(optionally verifying the public key first), and authenticate the username and
password. Only after succesfully completing these steps will the client be
considered connected and ready to process the packet queue from Client.send
.
204 def get_game_dir(self, callback: Callable, /): 205 """Get the games directory from the server and pass the response to callback.""" 206 self.send(Packet(Request.GAME_DIR), callback, do_next=True)
Get the games directory from the server and pass the response to callback.
208 def create_game( 209 self, 210 name: str, 211 /, 212 *, 213 password: Optional[str] = None, 214 callback: Optional[ResponseCallback] = None, 215 ): 216 """Request from the server to create and join a game.""" 217 payload = dict(name=name) 218 if password: 219 payload["password"] = password 220 self.send(Packet(Request.CREATE_GAME, payload), callback, do_next=True)
Request from the server to create and join a game.
222 def join_game( 223 self, 224 name: str, 225 /, 226 *, 227 password: Optional[str] = None, 228 callback: Optional[ResponseCallback] = None, 229 ): 230 """Request from the server to join a game.""" 231 payload = dict(name=name) 232 if password: 233 payload["password"] = password 234 self.send(Packet(Request.JOIN_GAME, payload), callback, do_next=True)
Request from the server to join a game.
236 def leave_game( 237 self, 238 *, 239 callback: Optional[ResponseCallback] = None, 240 ): 241 """Request from the server to leave the game.""" 242 self.send(Packet(Request.LEAVE_GAME), callback, do_next=True)
Request from the server to leave the game.
244 def send( 245 self, 246 packet: Packet, 247 callback: Optional[ResponseCallback] = None, 248 /, 249 do_next: bool = False, 250 ): 251 """Add a `pgnet.Packet` to the queue. 252 253 If a callback was given, the `pgnet.Response` will be passed to it. Callbacks 254 are not ensured, as the queue can be arbitrarily cleared using 255 `Client.flush_queue`. 256 """ 257 if not self._connected: 258 logger.warning(f"Cannot queue packets while disconnected: {packet}") 259 return 260 packet_count = len(self._packet_queue) 261 if packet_count >= 5: 262 logger.warning(f"{packet_count} packets pending, adding: {packet}") 263 packet_callback = (packet, callback) 264 if do_next: 265 self._packet_queue.insert(0, packet_callback) 266 else: 267 self._packet_queue.append(packet_callback)
Add a pgnet.Packet
to the queue.
If a callback was given, the pgnet.Response
will be passed to it. Callbacks
are not ensured, as the queue can be arbitrarily cleared using
Client.flush_queue
.
269 def flush_queue(self): 270 """Clear packets and their respective callbacks from the queue. 271 272 See also: `Client.send`. 273 """ 274 if self._packet_queue: 275 logger.debug(f"Discarding messages: {self._packet_queue}") 276 self._packet_queue = []
Clear packets and their respective callbacks from the queue.
See also: Client.send
.
278 def on_connection(self, connected: bool): 279 """Called when connected or disconnected. See also: `Client.connected`.""" 280 pass
Called when connected or disconnected. See also: Client.connected
.
287 def on_status(self, status: str): 288 """Called with feedback on client status. See also: `Client.status`.""" 289 pass
Called with feedback on client status. See also: Client.status
.
296 def on_game(self, game_name: Optional[str]): 297 """Called when joining or leaving a game. See also: `Client.game`.""" 298 pass
Called when joining or leaving a game. See also: Client.game
.
305 def on_heartbeat(self, heartbeat: Response): 306 """Override this method to implement heartbeat updates. 307 308 The *heartbeat* Response is given by `pgnet.util.Game.handle_heartbeat`. The 309 heartbeat rate is set by `pgnet.util.Game.heartbeat_rate`. 310 311 See also: `Client.heartbeat_payload`. 312 """ 313 pass
Override this method to implement heartbeat updates.
The heartbeat Response is given by pgnet.util.Game.handle_heartbeat
. The
heartbeat rate is set by pgnet.util.Game.heartbeat_rate
.
See also: Client.heartbeat_payload
.
315 def heartbeat_payload(self) -> dict: 316 """Override this method to add data to the heartbeat request payload. 317 318 This payload is passed to `pgnet.util.Game.handle_heartbeat`. The heartbeat rate 319 is set by `pgnet.util.Game.heartbeat_rate`. 320 321 See also: `Client.on_heartbeat`. 322 """ 323 return dict()
Override this method to add data to the heartbeat request payload.
This payload is passed to pgnet.util.Game.handle_heartbeat
. The heartbeat rate
is set by pgnet.util.Game.heartbeat_rate
.
See also: Client.on_heartbeat
.
28@dataclass 29class ClientConnection(Connection): 30 """Thin wrapper for `pgnet.util.Connection`. 31 32 Provides `ClientConnection.send_recv` for combined sending and receiving. 33 """ 34 35 game: Optional[str] = None 36 _websocket_busy: bool = False 37 38 async def send_recv(self, packet: Packet) -> Response: 39 """Send packet and receive response via Baseclass `send` and `recv`.""" 40 if self._websocket_busy: 41 logger.warning(f"Packet is waiting for busy websocket... {packet}") 42 while self._websocket_busy and self.websocket.open: 43 await asyncio.sleep(0.1) 44 self._websocket_busy = True 45 try: 46 await self.send(packet.serialize()) 47 response: str = await self.recv() 48 return Response.deserialize(response) 49 finally: 50 self._websocket_busy = False
Thin wrapper for pgnet.util.Connection
.
Provides ClientConnection.send_recv
for combined sending and receiving.
38 async def send_recv(self, packet: Packet) -> Response: 39 """Send packet and receive response via Baseclass `send` and `recv`.""" 40 if self._websocket_busy: 41 logger.warning(f"Packet is waiting for busy websocket... {packet}") 42 while self._websocket_busy and self.websocket.open: 43 await asyncio.sleep(0.1) 44 self._websocket_busy = True 45 try: 46 await self.send(packet.serialize()) 47 response: str = await self.recv() 48 return Response.deserialize(response) 49 finally: 50 self._websocket_busy = False