s2-dungeonsandexploits/flask/game.py

251 lines
9.2 KiB
Python
Raw Permalink Normal View History

2022-11-18 09:04:33 +01:00
import base64
import json
import traceback
import zlib
import simple_websocket
from flask import Blueprint, request
from flask_login import current_user, login_required
from models import User
from server import db
from sqlalchemy.orm.attributes import flag_modified
game = Blueprint("game", __name__)
ok_msg = json.dumps({"ok": True})
unparsable_msg = "huh?"
fail_msg = json.dumps({"ok": False})
def serialize(data):
return base64.standard_b64encode(zlib.compress(data.encode("ascii"), level=-1)).decode("ascii")
def unserialize(data):
return zlib.decompress(base64.standard_b64decode(data.encode("ascii"))).decode("ascii")
def init(data, ws):
w, h = len(data["colliders"][0]), len(data["colliders"])
current_user.loaded_world = {
"colliders": data["colliders"],
"objects": [[None for x in range(w)] for y in range(h)],
"owner": current_user.username,
}
current_user.claimed_base = ""
db.session.commit()
return current_user.loaded_world
def save(data, ws):
current_user.world = current_user.loaded_world
db.session.commit()
return current_user.world
def world(data, ws):
owner = data.get("owner", None)
if owner is None or not len(owner) or owner == current_user.username:
world = current_user.world
else:
world = User.query.filter(User.username == owner).first().world
current_user.loaded_world = world
db.session.commit()
return current_user.loaded_world
def put_object(data, ws):
location = data["location"]
object = data["item"]
if current_user.loaded_world["colliders"][location[0]][location[1]] is True:
raise AssertionError("you can't put objects in a wall.")
object_owner = data.get("owner", current_user.username)
if object_owner is None or not len(object_owner):
object_owner = current_user.username
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
raise AssertionError("you can't put objects in someone else's base.")
current_user.loaded_world["objects"][location[0]][location[1]] = {"type": str(object), "owner": object_owner}
flag_modified(current_user, "loaded_world")
db.session.commit()
return current_user.loaded_world
def take_object(data, ws):
location = data["location"]
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
raise AssertionError("you can't take objects from someone else's base.")
current_user.loaded_world["objects"][location[0]][location[1]] = None
flag_modified(current_user, "loaded_world")
db.session.commit()
return current_user.loaded_world
def push(data, ws):
location = data["location"]
if current_user.loaded_world["objects"][location[0]][location[1]] is None:
raise AssertionError("nothing to push.")
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
raise AssertionError("you can't push objects in someone else's base.")
if current_user.loaded_world["objects"][location[0]][location[1]]["type"] == "book":
raise AssertionError("you can't push books.")
direction = data["direction"]
if direction == "u":
i, j = 1, 0
elif direction == "d":
i, j = -1, 0
elif direction == "r":
i, j = 0, 1
elif direction == "l":
i, j = 0, -1
else:
raise AssertionError("bad direction.")
y, x = location
colliders = current_user.loaded_world["colliders"]
objects = current_user.loaded_world["objects"]
last = objects[y][x]
objects[y][x] = None
ok = False
for k in range(2):
y += i
x += j
if colliders[y][x] is True:
db.session.rollback()
raise AssertionError("you can't push objects into wall.")
current = objects[y][x]
2022-11-18 15:47:43 +01:00
if current is not None and current.get("type", None) == "book":
db.session.rollback()
raise AssertionError("you can't push objects into books.")
2022-11-18 09:04:33 +01:00
objects[y][x] = last
if current is None:
ok = True
break
last = current
if not ok:
db.session.rollback()
raise AssertionError("you can only push 2 objects in a row.")
flag_modified(current_user, "loaded_world")
db.session.commit()
return current_user.loaded_world
def claim_base(data, ws):
location = data["location"]
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
raise AssertionError("you can't claim someone else's base")
t = f"{location[0]}x{location[1]},"
if t not in current_user.claimed_base:
current_user.claimed_base += t
db.session.commit()
if current_user.loaded_world["owner"] != current_user.username:
return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base}
else:
return {"claimedBase": current_user.claimed_base}
def unclaim_base(data, ws):
location = data["location"]
current_user.claimed_base = current_user.claimed_base.replace(f"{location[0]}x{location[1]},", "")
db.session.commit()
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base}
else:
return {"claimedBase": current_user.claimed_base}
def get_base(data, ws):
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base}
else:
return {"claimedBase": current_user.claimed_base}
def set_inventory(data, ws):
location = data["location"]
object = current_user.loaded_world["objects"][location[0]][location[1]]
if object is None or object["type"] != "book":
raise AssertionError("inventory can only be accessed trough a book.")
if object["owner"] != current_user.username:
raise AssertionError("this book is not yours.")
current_user.inventory = data["inventory"]
db.session.commit()
return {"inventory": current_user.inventory}
def get_inventory(data, ws):
location = data["location"]
object = current_user.loaded_world["objects"][location[0]][location[1]]
if object is None or object["type"] != "book":
raise AssertionError("inventory can only be accessed trough a book.")
if object["owner"] != current_user.username:
raise AssertionError("this book is not yours.")
if current_user.loaded_world["owner"] != current_user.username:
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
if f"{location[0]}x{location[1]}" in owner.claimed_base:
return {"inventory": owner.inventory}
return {"inventory": current_user.inventory}
request_handlers = {
"claim_base": claim_base,
"get_base": get_base,
"get_inventory": get_inventory,
"init": init,
"push": push,
"put_object": put_object,
"save": save,
"set_inventory": set_inventory,
"take_object": take_object,
"unclaim_base": unclaim_base,
"world": world,
}
@game.route("/sock", websocket=True)
@login_required
def sock():
ws = simple_websocket.Server(request.environ)
try:
while True:
try:
data = json.loads(ws.receive())
if not isinstance(data, dict):
raise AssertionError("invalid request")
r = data.get("request", None)
if r not in request_handlers:
raise AssertionError("bad request")
ret = request_handlers[r](data, ws)
except json.JSONDecodeError:
traceback.print_exc()
ret = unparsable_msg
except simple_websocket.ws.ConnectionClosed:
ret = None
except AssertionError as e:
# traceback.print_exc()
# ret = {"ok": False}
ret = {"ok": False, "error": str(e)}
# ret = {"ok": False, "error": repr(e)}
# ret = {"ok": False, "error": traceback.format_exc()}
except Exception as e:
traceback.print_exc()
# ret = {"ok": False}
# ret = {"ok": False, "error": str(e)}
ret = {"ok": False, "error": repr(e)}
# ret = {"ok": False, "error": traceback.format_exc()}
if ret is not None:
ws.send(json.dumps(ret))
else:
ws.send(ok_msg)
except simple_websocket.ConnectionClosed:
pass
return ""