import base64 import json import traceback import zlib import simple_websocket from flask import Blueprint, request from flask_login import current_user, login_required from models import User from server import db from sqlalchemy.orm.attributes import flag_modified game = Blueprint("game", __name__) ok_msg = json.dumps({"ok": True}) unparsable_msg = "huh?" fail_msg = json.dumps({"ok": False}) def serialize(data): return base64.standard_b64encode(zlib.compress(data.encode("ascii"), level=-1)).decode("ascii") def unserialize(data): return zlib.decompress(base64.standard_b64decode(data.encode("ascii"))).decode("ascii") def init(data, ws): w, h = len(data["colliders"][0]), len(data["colliders"]) current_user.loaded_world = { "colliders": data["colliders"], "objects": [[None for x in range(w)] for y in range(h)], "owner": current_user.username, } current_user.claimed_base = "" db.session.commit() return current_user.loaded_world def save(data, ws): current_user.world = current_user.loaded_world db.session.commit() return current_user.world def world(data, ws): owner = data.get("owner", None) if owner is None or not len(owner) or owner == current_user.username: world = current_user.world else: world = User.query.filter(User.username == owner).first().world current_user.loaded_world = world db.session.commit() return current_user.loaded_world def put_object(data, ws): location = data["location"] object = data["item"] if current_user.loaded_world["colliders"][location[0]][location[1]] is True: raise AssertionError("you can't put objects in a wall.") object_owner = data.get("owner", current_user.username) if object_owner is None or not len(object_owner): object_owner = current_user.username if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","): raise AssertionError("you can't put objects in someone else's base.") current_user.loaded_world["objects"][location[0]][location[1]] = {"type": str(object), "owner": object_owner} flag_modified(current_user, "loaded_world") db.session.commit() return current_user.loaded_world def take_object(data, ws): location = data["location"] if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","): raise AssertionError("you can't take objects from someone else's base.") current_user.loaded_world["objects"][location[0]][location[1]] = None flag_modified(current_user, "loaded_world") db.session.commit() return current_user.loaded_world def push(data, ws): location = data["location"] if current_user.loaded_world["objects"][location[0]][location[1]] is None: raise AssertionError("nothing to push.") if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","): raise AssertionError("you can't push objects in someone else's base.") if current_user.loaded_world["objects"][location[0]][location[1]]["type"] == "book": raise AssertionError("you can't push books.") direction = data["direction"] if direction == "u": i, j = 1, 0 elif direction == "d": i, j = -1, 0 elif direction == "r": i, j = 0, 1 elif direction == "l": i, j = 0, -1 else: raise AssertionError("bad direction.") y, x = location colliders = current_user.loaded_world["colliders"] objects = current_user.loaded_world["objects"] last = objects[y][x] objects[y][x] = None ok = False for k in range(2): y += i x += j if colliders[y][x] is True: db.session.rollback() raise AssertionError("you can't push objects into wall.") current = objects[y][x] objects[y][x] = last if current is None: ok = True break last = current if not ok: db.session.rollback() raise AssertionError("you can only push 2 objects in a row.") flag_modified(current_user, "loaded_world") db.session.commit() return current_user.loaded_world def claim_base(data, ws): location = data["location"] if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","): raise AssertionError("you can't claim someone else's base") t = f"{location[0]}x{location[1]}," if t not in current_user.claimed_base: current_user.claimed_base += t db.session.commit() if current_user.loaded_world["owner"] != current_user.username: return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base} else: return {"claimedBase": current_user.claimed_base} def unclaim_base(data, ws): location = data["location"] current_user.claimed_base = current_user.claimed_base.replace(f"{location[0]}x{location[1]},", "") db.session.commit() if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base} else: return {"claimedBase": current_user.claimed_base} def get_base(data, ws): if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base} else: return {"claimedBase": current_user.claimed_base} def set_inventory(data, ws): location = data["location"] object = current_user.loaded_world["objects"][location[0]][location[1]] if object is None or object["type"] != "book": raise AssertionError("inventory can only be accessed trough a book.") if object["owner"] != current_user.username: raise AssertionError("this book is not yours.") current_user.inventory = data["inventory"] db.session.commit() return {"inventory": current_user.inventory} def get_inventory(data, ws): location = data["location"] object = current_user.loaded_world["objects"][location[0]][location[1]] if object is None or object["type"] != "book": raise AssertionError("inventory can only be accessed trough a book.") if object["owner"] != current_user.username: raise AssertionError("this book is not yours.") if current_user.loaded_world["owner"] != current_user.username: owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first() if f"{location[0]}x{location[1]}" in owner.claimed_base: return {"inventory": owner.inventory} return {"inventory": current_user.inventory} request_handlers = { "claim_base": claim_base, "get_base": get_base, "get_inventory": get_inventory, "init": init, "push": push, "put_object": put_object, "save": save, "set_inventory": set_inventory, "take_object": take_object, "unclaim_base": unclaim_base, "world": world, } @game.route("/sock", websocket=True) @login_required def sock(): ws = simple_websocket.Server(request.environ) try: while True: try: data = json.loads(ws.receive()) if not isinstance(data, dict): raise AssertionError("invalid request") r = data.get("request", None) if r not in request_handlers: raise AssertionError("bad request") ret = request_handlers[r](data, ws) except json.JSONDecodeError: traceback.print_exc() ret = unparsable_msg except simple_websocket.ws.ConnectionClosed: ret = None except AssertionError as e: # traceback.print_exc() # ret = {"ok": False} ret = {"ok": False, "error": str(e)} # ret = {"ok": False, "error": repr(e)} # ret = {"ok": False, "error": traceback.format_exc()} except Exception as e: traceback.print_exc() # ret = {"ok": False} # ret = {"ok": False, "error": str(e)} ret = {"ok": False, "error": repr(e)} # ret = {"ok": False, "error": traceback.format_exc()} if ret is not None: ws.send(json.dumps(ret)) else: ws.send(ok_msg) except simple_websocket.ConnectionClosed: pass return ""