2022-11-18 09:04:33 +01:00
|
|
|
import base64
|
|
|
|
import json
|
|
|
|
import traceback
|
|
|
|
import zlib
|
|
|
|
|
|
|
|
import simple_websocket
|
|
|
|
from flask import Blueprint, request
|
|
|
|
from flask_login import current_user, login_required
|
|
|
|
from models import User
|
|
|
|
from server import db
|
|
|
|
from sqlalchemy.orm.attributes import flag_modified
|
|
|
|
|
|
|
|
game = Blueprint("game", __name__)
|
|
|
|
|
|
|
|
ok_msg = json.dumps({"ok": True})
|
|
|
|
unparsable_msg = "huh?"
|
|
|
|
fail_msg = json.dumps({"ok": False})
|
|
|
|
|
|
|
|
|
|
|
|
def serialize(data):
|
|
|
|
return base64.standard_b64encode(zlib.compress(data.encode("ascii"), level=-1)).decode("ascii")
|
|
|
|
|
|
|
|
|
|
|
|
def unserialize(data):
|
|
|
|
return zlib.decompress(base64.standard_b64decode(data.encode("ascii"))).decode("ascii")
|
|
|
|
|
|
|
|
|
|
|
|
def init(data, ws):
|
|
|
|
w, h = len(data["colliders"][0]), len(data["colliders"])
|
|
|
|
current_user.loaded_world = {
|
|
|
|
"colliders": data["colliders"],
|
|
|
|
"objects": [[None for x in range(w)] for y in range(h)],
|
|
|
|
"owner": current_user.username,
|
|
|
|
}
|
|
|
|
current_user.claimed_base = ""
|
|
|
|
db.session.commit()
|
|
|
|
return current_user.loaded_world
|
|
|
|
|
|
|
|
|
|
|
|
def save(data, ws):
|
|
|
|
current_user.world = current_user.loaded_world
|
|
|
|
db.session.commit()
|
|
|
|
return current_user.world
|
|
|
|
|
|
|
|
|
|
|
|
def world(data, ws):
|
|
|
|
owner = data.get("owner", None)
|
|
|
|
if owner is None or not len(owner) or owner == current_user.username:
|
|
|
|
world = current_user.world
|
|
|
|
else:
|
|
|
|
world = User.query.filter(User.username == owner).first().world
|
|
|
|
current_user.loaded_world = world
|
|
|
|
db.session.commit()
|
|
|
|
return current_user.loaded_world
|
|
|
|
|
|
|
|
|
|
|
|
def put_object(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
object = data["item"]
|
|
|
|
if current_user.loaded_world["colliders"][location[0]][location[1]] is True:
|
|
|
|
raise AssertionError("you can't put objects in a wall.")
|
|
|
|
object_owner = data.get("owner", current_user.username)
|
|
|
|
if object_owner is None or not len(object_owner):
|
|
|
|
object_owner = current_user.username
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
|
|
|
|
raise AssertionError("you can't put objects in someone else's base.")
|
|
|
|
current_user.loaded_world["objects"][location[0]][location[1]] = {"type": str(object), "owner": object_owner}
|
|
|
|
flag_modified(current_user, "loaded_world")
|
|
|
|
db.session.commit()
|
|
|
|
return current_user.loaded_world
|
|
|
|
|
|
|
|
|
|
|
|
def take_object(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
|
|
|
|
raise AssertionError("you can't take objects from someone else's base.")
|
|
|
|
current_user.loaded_world["objects"][location[0]][location[1]] = None
|
|
|
|
flag_modified(current_user, "loaded_world")
|
|
|
|
db.session.commit()
|
|
|
|
return current_user.loaded_world
|
|
|
|
|
|
|
|
|
|
|
|
def push(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
if current_user.loaded_world["objects"][location[0]][location[1]] is None:
|
|
|
|
raise AssertionError("nothing to push.")
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
|
|
|
|
raise AssertionError("you can't push objects in someone else's base.")
|
|
|
|
if current_user.loaded_world["objects"][location[0]][location[1]]["type"] == "book":
|
|
|
|
raise AssertionError("you can't push books.")
|
|
|
|
direction = data["direction"]
|
|
|
|
if direction == "u":
|
|
|
|
i, j = 1, 0
|
|
|
|
elif direction == "d":
|
|
|
|
i, j = -1, 0
|
|
|
|
elif direction == "r":
|
|
|
|
i, j = 0, 1
|
|
|
|
elif direction == "l":
|
|
|
|
i, j = 0, -1
|
|
|
|
else:
|
|
|
|
raise AssertionError("bad direction.")
|
|
|
|
y, x = location
|
|
|
|
colliders = current_user.loaded_world["colliders"]
|
|
|
|
objects = current_user.loaded_world["objects"]
|
|
|
|
last = objects[y][x]
|
|
|
|
objects[y][x] = None
|
|
|
|
ok = False
|
|
|
|
for k in range(2):
|
|
|
|
y += i
|
|
|
|
x += j
|
|
|
|
if colliders[y][x] is True:
|
|
|
|
db.session.rollback()
|
|
|
|
raise AssertionError("you can't push objects into wall.")
|
|
|
|
current = objects[y][x]
|
2022-11-18 15:47:43 +01:00
|
|
|
if current is not None and current.get("type", None) == "book":
|
|
|
|
db.session.rollback()
|
|
|
|
raise AssertionError("you can't push objects into books.")
|
2022-11-18 09:04:33 +01:00
|
|
|
objects[y][x] = last
|
|
|
|
if current is None:
|
|
|
|
ok = True
|
|
|
|
break
|
|
|
|
last = current
|
|
|
|
if not ok:
|
|
|
|
db.session.rollback()
|
|
|
|
raise AssertionError("you can only push 2 objects in a row.")
|
|
|
|
flag_modified(current_user, "loaded_world")
|
|
|
|
db.session.commit()
|
|
|
|
return current_user.loaded_world
|
|
|
|
|
|
|
|
|
|
|
|
def claim_base(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
if f"{location[0]}x{location[1]}" in owner.claimed_base.split(","):
|
|
|
|
raise AssertionError("you can't claim someone else's base")
|
|
|
|
t = f"{location[0]}x{location[1]},"
|
|
|
|
if t not in current_user.claimed_base:
|
|
|
|
current_user.claimed_base += t
|
|
|
|
db.session.commit()
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base}
|
|
|
|
else:
|
|
|
|
return {"claimedBase": current_user.claimed_base}
|
|
|
|
|
|
|
|
|
|
|
|
def unclaim_base(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
current_user.claimed_base = current_user.claimed_base.replace(f"{location[0]}x{location[1]},", "")
|
|
|
|
db.session.commit()
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base}
|
|
|
|
else:
|
|
|
|
return {"claimedBase": current_user.claimed_base}
|
|
|
|
|
|
|
|
|
|
|
|
def get_base(data, ws):
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
return {"claimedBase": current_user.claimed_base, "ownerBase": owner.claimed_base}
|
|
|
|
else:
|
|
|
|
return {"claimedBase": current_user.claimed_base}
|
|
|
|
|
|
|
|
|
|
|
|
def set_inventory(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
object = current_user.loaded_world["objects"][location[0]][location[1]]
|
|
|
|
if object is None or object["type"] != "book":
|
|
|
|
raise AssertionError("inventory can only be accessed trough a book.")
|
|
|
|
if object["owner"] != current_user.username:
|
|
|
|
raise AssertionError("this book is not yours.")
|
|
|
|
current_user.inventory = data["inventory"]
|
|
|
|
db.session.commit()
|
|
|
|
return {"inventory": current_user.inventory}
|
|
|
|
|
|
|
|
|
|
|
|
def get_inventory(data, ws):
|
|
|
|
location = data["location"]
|
|
|
|
object = current_user.loaded_world["objects"][location[0]][location[1]]
|
|
|
|
if object is None or object["type"] != "book":
|
|
|
|
raise AssertionError("inventory can only be accessed trough a book.")
|
|
|
|
if object["owner"] != current_user.username:
|
|
|
|
raise AssertionError("this book is not yours.")
|
|
|
|
if current_user.loaded_world["owner"] != current_user.username:
|
|
|
|
owner = User.query.filter(User.username == current_user.loaded_world["owner"]).first()
|
|
|
|
if f"{location[0]}x{location[1]}" in owner.claimed_base:
|
|
|
|
return {"inventory": owner.inventory}
|
|
|
|
return {"inventory": current_user.inventory}
|
|
|
|
|
|
|
|
|
|
|
|
request_handlers = {
|
|
|
|
"claim_base": claim_base,
|
|
|
|
"get_base": get_base,
|
|
|
|
"get_inventory": get_inventory,
|
|
|
|
"init": init,
|
|
|
|
"push": push,
|
|
|
|
"put_object": put_object,
|
|
|
|
"save": save,
|
|
|
|
"set_inventory": set_inventory,
|
|
|
|
"take_object": take_object,
|
|
|
|
"unclaim_base": unclaim_base,
|
|
|
|
"world": world,
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@game.route("/sock", websocket=True)
|
|
|
|
@login_required
|
|
|
|
def sock():
|
|
|
|
ws = simple_websocket.Server(request.environ)
|
|
|
|
try:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
data = json.loads(ws.receive())
|
|
|
|
if not isinstance(data, dict):
|
|
|
|
raise AssertionError("invalid request")
|
|
|
|
r = data.get("request", None)
|
|
|
|
if r not in request_handlers:
|
|
|
|
raise AssertionError("bad request")
|
|
|
|
ret = request_handlers[r](data, ws)
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
traceback.print_exc()
|
|
|
|
ret = unparsable_msg
|
|
|
|
except simple_websocket.ws.ConnectionClosed:
|
|
|
|
ret = None
|
|
|
|
except AssertionError as e:
|
|
|
|
# traceback.print_exc()
|
|
|
|
# ret = {"ok": False}
|
|
|
|
ret = {"ok": False, "error": str(e)}
|
|
|
|
# ret = {"ok": False, "error": repr(e)}
|
|
|
|
# ret = {"ok": False, "error": traceback.format_exc()}
|
|
|
|
except Exception as e:
|
|
|
|
traceback.print_exc()
|
|
|
|
# ret = {"ok": False}
|
|
|
|
# ret = {"ok": False, "error": str(e)}
|
|
|
|
ret = {"ok": False, "error": repr(e)}
|
|
|
|
# ret = {"ok": False, "error": traceback.format_exc()}
|
|
|
|
if ret is not None:
|
|
|
|
ws.send(json.dumps(ret))
|
|
|
|
else:
|
|
|
|
ws.send(ok_msg)
|
|
|
|
except simple_websocket.ConnectionClosed:
|
|
|
|
pass
|
|
|
|
return ""
|