import asyncio
from jsonrpcserver import method, Success, Result, async_dispatch
import websockets
@method
async def ping() -> Result:
return Success("pong")
async def main(websocket, path):
if response := await async_dispatch(await websocket.recv()):
await websocket.send(response)
start_server = websockets.serve(main, "localhost", 5000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
Start the server:
$ python server.py
Client
Use jsonrpcclient to send requests:
pip install websockets jsonrpcclient
Create a client.py
:
import asyncio
import logging
from jsonrpcclient import Ok, parse_json, request_json
import websockets
async def main():
async with websockets.connect("ws://localhost:5000") as ws:
await ws.send(request_json("ping"))
response = parse_json(await ws.recv())
if isinstance(response, Ok):
print(response.result)
else:
logging.error(response.message)
asyncio.get_event_loop().run_until_complete(main())
Run the client: