""" Piket server, handles events generated by the client. """ import datetime import os from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import func from flask import Flask, jsonify, abort, request from flask_sqlalchemy import SQLAlchemy DATA_HOME = os.environ.get("XDG_DATA_HOME", "~/.local/share") CONFIG_DIR = os.path.join(DATA_HOME, "piket_server") DB_PATH = os.path.expanduser(os.path.join(CONFIG_DIR, "database.sqlite3")) DB_URL = f"sqlite:///{DB_PATH}" app = Flask("piket_server") app.config["SQLALCHEMY_DATABASE_URI"] = DB_URL app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) # ---------- Models ---------- class Person(db.Model): """ Represents a person to be shown on the lists. """ __tablename__ = "people" person_id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String, nullable=False) active = db.Column(db.Boolean, nullable=False, default=False) consumptions = db.relationship("Consumption", backref="person", lazy=True) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "person_id": self.person_id, "active": self.active, "name": self.name, "consumptions": { ct.consumption_type_id: Consumption.query.filter_by(person=self) .filter_by(settlement=None) .filter_by(consumption_type=ct) .filter_by(reversed=False) .count() for ct in ConsumptionType.query.all() }, } class Export(db.Model): """ Represents a set of exported Settlements. """ __tablename__ = "exports" export_id = db.Column(db.Integer, primary_key=True) created_at = db.Column( db.DateTime, default=datetime.datetime.utcnow, nullable=False ) settlements = db.relationship("Settlement", backref="export", lazy=True) @property def as_dict(self) -> dict: return { "export_id": self.export_id, "created_at": self.created_at.isoformat(), "settlement_ids": [s.settlement_id for s in self.settlements], } class Settlement(db.Model): """ Represents a settlement of the list. """ __tablename__ = "settlements" settlement_id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String, nullable=False) export_id = db.Column(db.Integer, db.ForeignKey("exports.export_id"), nullable=True) consumptions = db.relationship("Consumption", backref="settlement", lazy=True) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "settlement_id": self.settlement_id, "name": self.name, "consumption_summary": self.consumption_summary, "unique_people": self.unique_people, } @property def unique_people(self) -> int: q = ( Consumption.query.filter_by(settlement=self) .filter_by(reversed=False) .group_by(Consumption.person_id) .count() ) return q @property def consumption_summary(self) -> dict: q = ( Consumption.query.filter_by(settlement=self) .filter_by(reversed=False) .group_by(Consumption.consumption_type_id) .order_by(ConsumptionType.name) .outerjoin(ConsumptionType) .with_entities( Consumption.consumption_type_id, ConsumptionType.name, func.count(Consumption.consumption_id), ) .all() ) return {r[0]: {"name": r[1], "count": r[2]} for r in q} @property def per_person(self) -> dict: # Get keys of seen consumption_types c_types = self.consumption_summary.keys() result = {} for type in c_types: c_type = ConsumptionType.query.get(type) result[type] = {"consumption_type": c_type.as_dict, "counts": {}} q = ( Consumption.query.filter_by(settlement=self) .filter_by(reversed=False) .filter_by(consumption_type=c_type) .group_by(Consumption.person_id) .order_by(Person.name) .outerjoin(Person) .with_entities( Person.person_id, Person.name, func.count(Consumption.consumption_id), ) .all() ) for row in q: result[type]["counts"][row[0]] = {"name": row[1], "count": row[2]} return result class ConsumptionType(db.Model): """ Represents a type of consumption to be counted. """ __tablename__ = "consumption_types" consumption_type_id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String, nullable=False) icon = db.Column(db.String) consumptions = db.relationship("Consumption", backref="consumption_type", lazy=True) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "consumption_type_id": self.consumption_type_id, "name": self.name, "icon": self.icon, } class Consumption(db.Model): """ Represent one consumption to be counted. """ __tablename__ = "consumptions" consumption_id = db.Column(db.Integer, primary_key=True) person_id = db.Column(db.Integer, db.ForeignKey("people.person_id"), nullable=True) consumption_type_id = db.Column( db.Integer, db.ForeignKey("consumption_types.consumption_type_id"), nullable=False, ) settlement_id = db.Column( db.Integer, db.ForeignKey("settlements.settlement_id"), nullable=True ) created_at = db.Column( db.DateTime, default=datetime.datetime.utcnow, nullable=False ) reversed = db.Column(db.Boolean, default=False, nullable=False) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "consumption_id": self.consumption_id, "person_id": self.person_id, "consumption_type_id": self.consumption_type_id, "settlement_id": self.settlement_id, "created_at": self.created_at.isoformat(), "reversed": self.reversed, } # ---------- Models ---------- @app.route("/ping") def ping() -> None: """ Return a status ping. """ return "Pong" @app.route("/status") def status() -> None: """ Return a status dict with info about the database. """ unsettled_q = Consumption.query.filter_by(settlement=None).filter_by(reversed=False) unsettled = unsettled_q.count() first = None last = None if unsettled: last = ( unsettled_q.order_by(Consumption.created_at.desc()) .first() .created_at.isoformat() ) first = ( unsettled_q.order_by(Consumption.created_at.asc()) .first() .created_at.isoformat() ) return jsonify({"unsettled": {"amount": unsettled, "first": first, "last": last}}) # Person @app.route("/people", methods=["GET"]) def get_people(): """ Return a list of currently known people. """ people = Person.query.order_by(Person.name).all() q = Person.query.order_by(Person.name) if request.args.get("active"): active_status = request.args.get("active", type=int) q = q.filter_by(active=active_status) people = q.all() result = [person.as_dict for person in people] return jsonify(people=result) @app.route("/people/", methods=["GET"]) def get_person(person_id: int): person = Person.query.get_or_404(person_id) return jsonify(person=person.as_dict) @app.route("/people", methods=["POST"]) def add_person(): """ Add a new person. Required parameters: - name (str) """ json = request.get_json() if not json: return jsonify({"error": "Could not parse JSON."}), 400 data = json.get("person") or {} person = Person(name=data.get("name"), active=data.get("active", False)) try: db.session.add(person) db.session.commit() except SQLAlchemyError: return jsonify({"error": "Invalid arguments for Person."}), 400 return jsonify(person=person.as_dict), 201 @app.route("/people//add_consumption", methods=["POST"]) def add_consumption(person_id: int): person = Person.query.get_or_404(person_id) consumption = Consumption(person=person, consumption_type_id=1) try: db.session.add(consumption) db.session.commit() except SQLAlchemyError: return ( jsonify( {"error": "Invalid Consumption parameters.", "person": person.as_dict} ), 400, ) return jsonify(person=person.as_dict, consumption=consumption.as_dict), 201 @app.route("/people/", methods=["PATCH"]) def update_person(person_id: int): person = Person.query.get_or_404(person_id) data = request.json["person"] if "active" in data: person.active = data["active"] db.session.add(person) db.session.commit() return jsonify(person=person.as_dict) @app.route("/people//add_consumption/", methods=["POST"]) def add_consumption2(person_id: int, ct_id: int): person = Person.query.get_or_404(person_id) consumption = Consumption(person=person, consumption_type_id=ct_id) try: db.session.add(consumption) db.session.commit() except SQLAlchemyError: return ( jsonify( {"error": "Invalid Consumption parameters.", "person": person.as_dict} ), 400, ) return jsonify(person=person.as_dict, consumption=consumption.as_dict), 201 @app.route("/consumptions/", methods=["DELETE"]) def reverse_consumption(consumption_id: int): """ Reverse a consumption. """ consumption = Consumption.query.get_or_404(consumption_id) if consumption.reversed: return ( jsonify( { "error": "Consumption already reversed", "consumption": consumption.as_dict, } ), 409, ) try: consumption.reversed = True db.session.add(consumption) db.session.commit() except SQLAlchemyError: return jsonify({"error": "Database error."}), 500 return jsonify(consumption=consumption.as_dict), 200 # ConsumptionType @app.route("/consumption_types", methods=["GET"]) def get_consumption_types(): """ Return a list of currently active consumption types. """ ctypes = ConsumptionType.query.all() result = [ct.as_dict for ct in ctypes] return jsonify(consumption_types=result) @app.route("/consumption_types/", methods=["GET"]) def get_consumption_type(consumption_type_id: int): ct = ConsumptionType.query.get_or_404(consumption_type_id) return jsonify(consumption_type=ct.as_dict) @app.route("/consumption_types", methods=["POST"]) def add_consumption_type(): """ Add a new ConsumptionType. """ json = request.get_json() if not json: return jsonify({"error": "Could not parse JSON."}), 400 data = json.get("consumption_type") or {} ct = ConsumptionType(name=data.get("name"), icon=data.get("icon")) try: db.session.add(ct) db.session.commit() except SQLAlchemyError: return jsonify({"error": "Invalid arguments for ConsumptionType."}), 400 return jsonify(consumption_type=ct.as_dict), 201 # Settlement @app.route("/settlements", methods=["GET"]) def get_settlements(): """ Return a list of the active Settlements. """ result = Settlement.query.all() return jsonify(settlements=[s.as_dict for s in result]) @app.route("/settlements/", methods=["GET"]) def get_settlement(settlement_id: int): """ Show full details for a single Settlement. """ s = Settlement.query.get_or_404(settlement_id) per_person = s.per_person return jsonify(settlement=s.as_dict, count_info=per_person) @app.route("/settlements", methods=["POST"]) def add_settlement(): """ Create a Settlement, and link all un-settled Consumptions to it. """ json = request.get_json() if not json: return jsonify({"error": "Could not parse JSON."}), 400 data = json.get("settlement") or {} s = Settlement(name=data["name"]) db.session.add(s) db.session.commit() Consumption.query.filter_by(settlement=None).update( {"settlement_id": s.settlement_id} ) db.session.commit() return jsonify(settlement=s.as_dict) # Export @app.route("/exports", methods=["GET"]) def get_exports(): """ Return a list of the created Exports. """ result = Export.query.all() return jsonify(exports=[e.as_dict for e in result]) @app.route("/exports/", methods=["GET"]) def get_export(export_id: int): """ Return an overview for the given Export. """ e = Export.query.get_or_404(export_id) ss = [s.as_dict for s in e.settlements] return jsonify(export=e.as_dict, settlements=ss) @app.route("/exports", methods=["POST"]) def add_export(): """ Create an Export, and link all un-exported Settlements to it. """ # Assert that there are Settlements to be exported. s_count = Settlement.query.filter_by(export=None).count() if s_count == 0: return jsonify(error="No un-exported Settlements."), 403 e = Export() db.session.add(e) db.session.commit() Settlement.query.filter_by(export=None).update({"export_id": e.export_id}) db.session.commit() ss = [s.as_dict for s in e.settlements] return jsonify(export=e.as_dict, settlements=ss), 201