""" Piket server, handles events generated by the client. """ import datetime import os from collections import defaultdict from sqlalchemy.exc import SQLAlchemyError from sqlalchemy import func from flask import Flask, jsonify, abort, request from flask_sqlalchemy import SQLAlchemy DATA_HOME = os.environ.get("XDG_DATA_HOME", "~/.local/share") CONFIG_DIR = os.path.join(DATA_HOME, "piket_server") DB_PATH = os.path.expanduser(os.path.join(CONFIG_DIR, "database.sqlite3")) DB_URL = f"sqlite:///{DB_PATH}" app = Flask("piket_server") app.config["SQLALCHEMY_DATABASE_URI"] = DB_URL app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False db = SQLAlchemy(app) # ---------- Models ---------- class Person(db.Model): """ Represents a person to be shown on the lists. """ __tablename__ = "people" person_id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String, nullable=False) active = db.Column(db.Boolean, nullable=False, default=False) consumptions = db.relationship("Consumption", backref="person", lazy=True) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "person_id": self.person_id, "active": self.active, "name": self.name, "consumptions": { ct.consumption_type_id: Consumption.query.filter_by(person=self) .filter_by(settlement=None) .filter_by(consumption_type=ct) .filter_by(reversed=False) .count() for ct in ConsumptionType.query.filter_by(active=True).all() }, } class Export(db.Model): """ Represents a set of exported Settlements. """ __tablename__ = "exports" export_id = db.Column(db.Integer, primary_key=True) created_at = db.Column( db.DateTime, default=datetime.datetime.utcnow, nullable=False ) settlements = db.relationship("Settlement", backref="export", lazy=True) @property def as_dict(self) -> dict: return { "export_id": self.export_id, "created_at": self.created_at.isoformat(), "settlement_ids": [s.settlement_id for s in self.settlements], } class Settlement(db.Model): """ Represents a settlement of the list. """ __tablename__ = "settlements" settlement_id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String, nullable=False) export_id = db.Column(db.Integer, db.ForeignKey("exports.export_id"), nullable=True) consumptions = db.relationship("Consumption", backref="settlement", lazy=True) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "settlement_id": self.settlement_id, "name": self.name, "consumption_summary": self.consumption_summary, "unique_people": self.unique_people, } @property def unique_people(self) -> int: q = ( Consumption.query.filter_by(settlement=self) .filter_by(reversed=False) .group_by(Consumption.person_id) .count() ) return q @property def consumption_summary(self) -> dict: q = ( Consumption.query.filter_by(settlement=self) .filter_by(reversed=False) .group_by(Consumption.consumption_type_id) .order_by(ConsumptionType.name) .outerjoin(ConsumptionType) .with_entities( Consumption.consumption_type_id, ConsumptionType.name, func.count(Consumption.consumption_id), ) .all() ) return {r[0]: {"name": r[1], "count": r[2]} for r in q} @property def per_person(self) -> dict: # Get keys of seen consumption_types c_types = self.consumption_summary.keys() result = {} for type in c_types: c_type = ConsumptionType.query.get(type) result[type] = {"consumption_type": c_type.as_dict, "counts": {}} q = ( Consumption.query.filter_by(settlement=self) .filter_by(reversed=False) .filter_by(consumption_type=c_type) .group_by(Consumption.person_id) .order_by(Person.name) .outerjoin(Person) .with_entities( Person.person_id, Person.name, func.count(Consumption.consumption_id), ) .all() ) for row in q: result[type]["counts"][row[0]] = {"name": row[1], "count": row[2]} return result class ConsumptionType(db.Model): """ Represents a type of consumption to be counted. """ __tablename__ = "consumption_types" consumption_type_id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String, nullable=False) icon = db.Column(db.String) active = db.Column(db.Boolean, default=True) consumptions = db.relationship("Consumption", backref="consumption_type", lazy=True) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "consumption_type_id": self.consumption_type_id, "name": self.name, "icon": self.icon, } class Consumption(db.Model): """ Represent one consumption to be counted. """ __tablename__ = "consumptions" consumption_id = db.Column(db.Integer, primary_key=True) person_id = db.Column(db.Integer, db.ForeignKey("people.person_id"), nullable=True) consumption_type_id = db.Column( db.Integer, db.ForeignKey("consumption_types.consumption_type_id"), nullable=False, ) settlement_id = db.Column( db.Integer, db.ForeignKey("settlements.settlement_id"), nullable=True ) created_at = db.Column( db.DateTime, default=datetime.datetime.utcnow, nullable=False ) reversed = db.Column(db.Boolean, default=False, nullable=False) def __repr__(self) -> str: return f"" @property def as_dict(self) -> dict: return { "consumption_id": self.consumption_id, "person_id": self.person_id, "consumption_type_id": self.consumption_type_id, "settlement_id": self.settlement_id, "created_at": self.created_at.isoformat(), "reversed": self.reversed, } # ---------- Models ---------- @app.route("/ping") def ping() -> None: """ Return a status ping. """ return "Pong" @app.route("/status") def status() -> None: """ Return a status dict with info about the database. """ unsettled_q = Consumption.query.filter_by(settlement=None).filter_by(reversed=False) unsettled = unsettled_q.count() first = None last = None if unsettled: last = ( unsettled_q.order_by(Consumption.created_at.desc()) .first() .created_at.isoformat() ) first = ( unsettled_q.order_by(Consumption.created_at.asc()) .first() .created_at.isoformat() ) return jsonify({"unsettled": {"amount": unsettled, "first": first, "last": last}}) # Person @app.route("/people", methods=["GET"]) def get_people(): """ Return a list of currently known people. """ q = Person.query.order_by(Person.name) if request.args.get("active"): active_status = request.args.get("active", type=int) q = q.filter_by(active=active_status) people = q.all() engine = db.get_engine() query = ''' SELECT consumptions.person_id, consumptions.consumption_type_id, COUNT(*) FROM consumptions JOIN consumption_types ON consumptions.consumption_type_id = consumption_types.consumption_type_id WHERE consumptions.settlement_id IS NULL AND consumptions.reversed = 0 GROUP BY consumptions.person_id, consumptions.consumption_type_id; ''' raw_counts = engine.execute(query) counts: 'Dict[int, Dict[str, int]]' = defaultdict(dict) for person_id, consumption_type_id, count in raw_counts: counts[person_id][str(consumption_type_id)] = count result = [ { "name": person.name, "active": person.active, "person_id": person.person_id, "consumptions": counts[person.person_id] } for person in people ] # result = [person.as_dict for person in people] return jsonify(people=result) @app.route("/people/", methods=["GET"]) def get_person(person_id: int): person = Person.query.get_or_404(person_id) return jsonify(person=person.as_dict) @app.route("/people", methods=["POST"]) def add_person(): """ Add a new person. Required parameters: - name (str) """ json = request.get_json() if not json: return jsonify({"error": "Could not parse JSON."}), 400 data = json.get("person") or {} person = Person(name=data.get("name"), active=data.get("active", False)) try: db.session.add(person) db.session.commit() except SQLAlchemyError: return jsonify({"error": "Invalid arguments for Person."}), 400 return jsonify(person=person.as_dict), 201 @app.route("/people//add_consumption", methods=["POST"]) def add_consumption(person_id: int): person = Person.query.get_or_404(person_id) consumption = Consumption(person=person, consumption_type_id=1) try: db.session.add(consumption) db.session.commit() except SQLAlchemyError: return ( jsonify( {"error": "Invalid Consumption parameters.", "person": person.as_dict} ), 400, ) return jsonify(person=person.as_dict, consumption=consumption.as_dict), 201 @app.route("/people/", methods=["PATCH"]) def update_person(person_id: int): person = Person.query.get_or_404(person_id) data = request.json["person"] if "active" in data: person.active = data["active"] db.session.add(person) db.session.commit() return jsonify(person=person.as_dict) @app.route("/people//add_consumption/", methods=["POST"]) def add_consumption2(person_id: int, ct_id: int): person = Person.query.get_or_404(person_id) consumption = Consumption(person=person, consumption_type_id=ct_id) try: db.session.add(consumption) db.session.commit() except SQLAlchemyError: return ( jsonify( {"error": "Invalid Consumption parameters.", "person": person.as_dict} ), 400, ) return jsonify(person=person.as_dict, consumption=consumption.as_dict), 201 @app.route("/consumptions/", methods=["DELETE"]) def reverse_consumption(consumption_id: int): """ Reverse a consumption. """ consumption = Consumption.query.get_or_404(consumption_id) if consumption.reversed: return ( jsonify( { "error": "Consumption already reversed", "consumption": consumption.as_dict, } ), 409, ) try: consumption.reversed = True db.session.add(consumption) db.session.commit() except SQLAlchemyError: return jsonify({"error": "Database error."}), 500 return jsonify(consumption=consumption.as_dict), 200 # ConsumptionType @app.route("/consumption_types", methods=["GET"]) def get_consumption_types(): """ Return a list of currently active consumption types. """ ctypes = ConsumptionType.query.filter_by(active=True).all() result = [ct.as_dict for ct in ctypes] return jsonify(consumption_types=result) @app.route("/consumption_types/", methods=["GET"]) def get_consumption_type(consumption_type_id: int): ct = ConsumptionType.query.get_or_404(consumption_type_id) return jsonify(consumption_type=ct.as_dict) @app.route("/consumption_types", methods=["POST"]) def add_consumption_type(): """ Add a new ConsumptionType. """ json = request.get_json() if not json: return jsonify({"error": "Could not parse JSON."}), 400 data = json.get("consumption_type") or {} ct = ConsumptionType(name=data.get("name"), icon=data.get("icon")) try: db.session.add(ct) db.session.commit() except SQLAlchemyError: return jsonify({"error": "Invalid arguments for ConsumptionType."}), 400 return jsonify(consumption_type=ct.as_dict), 201 # Settlement @app.route("/settlements", methods=["GET"]) def get_settlements(): """ Return a list of the active Settlements. """ result = Settlement.query.all() return jsonify(settlements=[s.as_dict for s in result]) @app.route("/settlements/", methods=["GET"]) def get_settlement(settlement_id: int): """ Show full details for a single Settlement. """ s = Settlement.query.get_or_404(settlement_id) per_person = s.per_person return jsonify(settlement=s.as_dict, count_info=per_person) @app.route("/settlements", methods=["POST"]) def add_settlement(): """ Create a Settlement, and link all un-settled Consumptions to it. """ json = request.get_json() if not json: return jsonify({"error": "Could not parse JSON."}), 400 data = json.get("settlement") or {} s = Settlement(name=data["name"]) db.session.add(s) db.session.commit() Consumption.query.filter_by(settlement=None).update( {"settlement_id": s.settlement_id} ) db.session.commit() return jsonify(settlement=s.as_dict) # Export @app.route("/exports", methods=["GET"]) def get_exports(): """ Return a list of the created Exports. """ result = Export.query.all() return jsonify(exports=[e.as_dict for e in result]) @app.route("/exports/", methods=["GET"]) def get_export(export_id: int): """ Return an overview for the given Export. """ e = Export.query.get_or_404(export_id) ss = [s.as_dict for s in e.settlements] return jsonify(export=e.as_dict, settlements=ss) @app.route("/exports", methods=["POST"]) def add_export(): """ Create an Export, and link all un-exported Settlements to it. """ # Assert that there are Settlements to be exported. s_count = Settlement.query.filter_by(export=None).count() if s_count == 0: return jsonify(error="No un-exported Settlements."), 403 e = Export() db.session.add(e) db.session.commit() Settlement.query.filter_by(export=None).update({"export_id": e.export_id}) db.session.commit() ss = [s.as_dict for s in e.settlements] return jsonify(export=e.as_dict, settlements=ss), 201