123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523 |
- """
- Piket server, handles events generated by the client.
- """
- import datetime
- import os
- from collections import defaultdict
- from sqlalchemy.exc import SQLAlchemyError
- from sqlalchemy import func
- from flask import Flask, jsonify, abort, request
- from flask_sqlalchemy import SQLAlchemy
- DATA_HOME = os.environ.get("XDG_DATA_HOME", "~/.local/share")
- CONFIG_DIR = os.path.join(DATA_HOME, "piket_server")
- DB_PATH = os.path.expanduser(os.path.join(CONFIG_DIR, "database.sqlite3"))
- DB_URL = f"sqlite:///{DB_PATH}"
- app = Flask("piket_server")
- app.config["SQLALCHEMY_DATABASE_URI"] = DB_URL
- app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
- db = SQLAlchemy(app)
- # ---------- Models ----------
- class Person(db.Model):
- """Represents a person to be shown on the lists."""
- __tablename__ = "people"
- person_id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String, nullable=False)
- active = db.Column(db.Boolean, nullable=False, default=False)
- consumptions = db.relationship("Consumption", backref="person", lazy=True)
- def __repr__(self) -> str:
- return f"<Person {self.person_id}: {self.name}>"
- @property
- def as_dict(self) -> dict:
- return {
- "person_id": self.person_id,
- "active": self.active,
- "name": self.name,
- "consumptions": {
- ct.consumption_type_id: Consumption.query.filter_by(person=self)
- .filter_by(settlement=None)
- .filter_by(consumption_type=ct)
- .filter_by(reversed=False)
- .count()
- for ct in ConsumptionType.query.filter_by(active=True).all()
- },
- }
- class Export(db.Model):
- """Represents a set of exported Settlements."""
- __tablename__ = "exports"
- export_id = db.Column(db.Integer, primary_key=True)
- created_at = db.Column(
- db.DateTime, default=datetime.datetime.utcnow, nullable=False
- )
- settlements = db.relationship("Settlement", backref="export", lazy=True)
- @property
- def as_dict(self) -> dict:
- return {
- "export_id": self.export_id,
- "created_at": self.created_at.isoformat(),
- "settlement_ids": [s.settlement_id for s in self.settlements],
- }
- class Settlement(db.Model):
- """Represents a settlement of the list."""
- __tablename__ = "settlements"
- settlement_id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String, nullable=False)
- export_id = db.Column(db.Integer, db.ForeignKey("exports.export_id"), nullable=True)
- consumptions = db.relationship("Consumption", backref="settlement", lazy=True)
- def __repr__(self) -> str:
- return f"<Settlement {self.settlement_id}: {self.name}>"
- @property
- def as_dict(self) -> dict:
- return {
- "settlement_id": self.settlement_id,
- "name": self.name,
- "consumption_summary": self.consumption_summary,
- "unique_people": self.unique_people,
- }
- @property
- def unique_people(self) -> int:
- q = (
- Consumption.query.filter_by(settlement=self)
- .filter_by(reversed=False)
- .group_by(Consumption.person_id)
- .count()
- )
- return q
- @property
- def consumption_summary(self) -> dict:
- q = (
- Consumption.query.filter_by(settlement=self)
- .filter_by(reversed=False)
- .group_by(Consumption.consumption_type_id)
- .order_by(ConsumptionType.name)
- .outerjoin(ConsumptionType)
- .with_entities(
- Consumption.consumption_type_id,
- ConsumptionType.name,
- func.count(Consumption.consumption_id),
- )
- .all()
- )
- return {r[0]: {"name": r[1], "count": r[2]} for r in q}
- @property
- def per_person(self) -> dict:
- # Get keys of seen consumption_types
- c_types = self.consumption_summary.keys()
- result = {}
- for type in c_types:
- c_type = ConsumptionType.query.get(type)
- result[type] = {"consumption_type": c_type.as_dict, "counts": {}}
- q = (
- Consumption.query.filter_by(settlement=self)
- .filter_by(reversed=False)
- .filter_by(consumption_type=c_type)
- .group_by(Consumption.person_id)
- .order_by(Person.name)
- .outerjoin(Person)
- .with_entities(
- Person.person_id,
- Person.name,
- func.count(Consumption.consumption_id),
- )
- .all()
- )
- for row in q:
- result[type]["counts"][row[0]] = {"name": row[1], "count": row[2]}
- return result
- class ConsumptionType(db.Model):
- """Represents a type of consumption to be counted."""
- __tablename__ = "consumption_types"
- consumption_type_id = db.Column(db.Integer, primary_key=True)
- name = db.Column(db.String, nullable=False)
- icon = db.Column(db.String)
- active = db.Column(db.Boolean, default=True)
- consumptions = db.relationship("Consumption", backref="consumption_type", lazy=True)
- def __repr__(self) -> str:
- return f"<ConsumptionType: {self.name}>"
- @property
- def as_dict(self) -> dict:
- return {
- "consumption_type_id": self.consumption_type_id,
- "name": self.name,
- "icon": self.icon,
- }
- class Consumption(db.Model):
- """Represent one consumption to be counted."""
- __tablename__ = "consumptions"
- consumption_id = db.Column(db.Integer, primary_key=True)
- person_id = db.Column(db.Integer, db.ForeignKey("people.person_id"), nullable=True)
- consumption_type_id = db.Column(
- db.Integer,
- db.ForeignKey("consumption_types.consumption_type_id"),
- nullable=False,
- )
- settlement_id = db.Column(
- db.Integer, db.ForeignKey("settlements.settlement_id"), nullable=True
- )
- created_at = db.Column(
- db.DateTime, default=datetime.datetime.utcnow, nullable=False
- )
- reversed = db.Column(db.Boolean, default=False, nullable=False)
- def __repr__(self) -> str:
- return f"<Consumption: {self.consumption_type.name} for {self.person.name}>"
- @property
- def as_dict(self) -> dict:
- return {
- "consumption_id": self.consumption_id,
- "person_id": self.person_id,
- "consumption_type_id": self.consumption_type_id,
- "settlement_id": self.settlement_id,
- "created_at": self.created_at.isoformat(),
- "reversed": self.reversed,
- }
- # ---------- Models ----------
- @app.route("/ping")
- def ping() -> None:
- """Return a status ping."""
- return "Pong"
- @app.route("/status")
- def status() -> None:
- """Return a status dict with info about the database."""
- unsettled_q = Consumption.query.filter_by(settlement=None).filter_by(reversed=False)
- unsettled = unsettled_q.count()
- first = None
- last = None
- if unsettled:
- last = (
- unsettled_q.order_by(Consumption.created_at.desc())
- .first()
- .created_at.isoformat()
- )
- first = (
- unsettled_q.order_by(Consumption.created_at.asc())
- .first()
- .created_at.isoformat()
- )
- return jsonify({"unsettled": {"amount": unsettled, "first": first, "last": last}})
- # Person
- @app.route("/people", methods=["GET"])
- def get_people():
- """Return a list of currently known people."""
- q = Person.query.order_by(Person.name)
- if request.args.get("active"):
- active_status = request.args.get("active", type=int)
- q = q.filter_by(active=active_status)
- people = q.all()
- engine = db.get_engine()
- query = """
- SELECT
- consumptions.person_id,
- consumptions.consumption_type_id,
- COUNT(*)
- FROM consumptions
- JOIN consumption_types
- ON consumptions.consumption_type_id = consumption_types.consumption_type_id
- WHERE
- consumptions.settlement_id IS NULL
- AND consumptions.reversed = 0
- GROUP BY consumptions.person_id, consumptions.consumption_type_id;
- """
- raw_counts = engine.execute(query)
- counts: "Dict[int, Dict[str, int]]" = defaultdict(dict)
- for person_id, consumption_type_id, count in raw_counts:
- counts[person_id][str(consumption_type_id)] = count
- result = [
- {
- "name": person.name,
- "active": person.active,
- "person_id": person.person_id,
- "consumptions": counts[person.person_id],
- }
- for person in people
- ]
- # result = [person.as_dict for person in people]
- return jsonify(people=result)
- @app.route("/people/<int:person_id>", methods=["GET"])
- def get_person(person_id: int):
- person = Person.query.get_or_404(person_id)
- return jsonify(person=person.as_dict)
- @app.route("/people", methods=["POST"])
- def add_person():
- """
- Add a new person.
- Required parameters:
- - name (str)
- """
- json = request.get_json()
- if not json:
- return jsonify({"error": "Could not parse JSON."}), 400
- data = json.get("person") or {}
- person = Person(name=data.get("name"), active=data.get("active", False))
- try:
- db.session.add(person)
- db.session.commit()
- except SQLAlchemyError:
- return jsonify({"error": "Invalid arguments for Person."}), 400
- return jsonify(person=person.as_dict), 201
- @app.route("/people/<int:person_id>/add_consumption", methods=["POST"])
- def add_consumption(person_id: int):
- person = Person.query.get_or_404(person_id)
- consumption = Consumption(person=person, consumption_type_id=1)
- try:
- db.session.add(consumption)
- db.session.commit()
- except SQLAlchemyError:
- return (
- jsonify(
- {"error": "Invalid Consumption parameters.", "person": person.as_dict}
- ),
- 400,
- )
- return jsonify(person=person.as_dict, consumption=consumption.as_dict), 201
- @app.route("/people/<int:person_id>", methods=["PATCH"])
- def update_person(person_id: int):
- person = Person.query.get_or_404(person_id)
- data = request.json["person"]
- if "active" in data:
- person.active = data["active"]
- db.session.add(person)
- db.session.commit()
- return jsonify(person=person.as_dict)
- @app.route("/people/<int:person_id>/add_consumption/<int:ct_id>", methods=["POST"])
- def add_consumption2(person_id: int, ct_id: int):
- person = Person.query.get_or_404(person_id)
- consumption = Consumption(person=person, consumption_type_id=ct_id)
- try:
- db.session.add(consumption)
- db.session.commit()
- except SQLAlchemyError:
- return (
- jsonify(
- {"error": "Invalid Consumption parameters.", "person": person.as_dict}
- ),
- 400,
- )
- return jsonify(person=person.as_dict, consumption=consumption.as_dict), 201
- @app.route("/consumptions/<int:consumption_id>", methods=["DELETE"])
- def reverse_consumption(consumption_id: int):
- """Reverse a consumption."""
- consumption = Consumption.query.get_or_404(consumption_id)
- if consumption.reversed:
- return (
- jsonify(
- {
- "error": "Consumption already reversed",
- "consumption": consumption.as_dict,
- }
- ),
- 409,
- )
- try:
- consumption.reversed = True
- db.session.add(consumption)
- db.session.commit()
- except SQLAlchemyError:
- return jsonify({"error": "Database error."}), 500
- return jsonify(consumption=consumption.as_dict), 200
- # ConsumptionType
- @app.route("/consumption_types", methods=["GET"])
- def get_consumption_types():
- """Return a list of currently active consumption types."""
- ctypes = ConsumptionType.query.filter_by(active=True).all()
- result = [ct.as_dict for ct in ctypes]
- return jsonify(consumption_types=result)
- @app.route("/consumption_types/<int:consumption_type_id>", methods=["GET"])
- def get_consumption_type(consumption_type_id: int):
- ct = ConsumptionType.query.get_or_404(consumption_type_id)
- return jsonify(consumption_type=ct.as_dict)
- @app.route("/consumption_types", methods=["POST"])
- def add_consumption_type():
- """Add a new ConsumptionType."""
- json = request.get_json()
- if not json:
- return jsonify({"error": "Could not parse JSON."}), 400
- data = json.get("consumption_type") or {}
- ct = ConsumptionType(name=data.get("name"), icon=data.get("icon"))
- try:
- db.session.add(ct)
- db.session.commit()
- except SQLAlchemyError:
- return jsonify({"error": "Invalid arguments for ConsumptionType."}), 400
- return jsonify(consumption_type=ct.as_dict), 201
- # Settlement
- @app.route("/settlements", methods=["GET"])
- def get_settlements():
- """Return a list of the active Settlements."""
- result = Settlement.query.all()
- return jsonify(settlements=[s.as_dict for s in result])
- @app.route("/settlements/<int:settlement_id>", methods=["GET"])
- def get_settlement(settlement_id: int):
- """Show full details for a single Settlement."""
- s = Settlement.query.get_or_404(settlement_id)
- per_person = s.per_person
- return jsonify(settlement=s.as_dict, count_info=per_person)
- @app.route("/settlements", methods=["POST"])
- def add_settlement():
- """Create a Settlement, and link all un-settled Consumptions to it."""
- json = request.get_json()
- if not json:
- return jsonify({"error": "Could not parse JSON."}), 400
- data = json.get("settlement") or {}
- s = Settlement(name=data["name"])
- db.session.add(s)
- db.session.commit()
- Consumption.query.filter_by(settlement=None).update(
- {"settlement_id": s.settlement_id}
- )
- db.session.commit()
- return jsonify(settlement=s.as_dict)
- # Export
- @app.route("/exports", methods=["GET"])
- def get_exports():
- """Return a list of the created Exports."""
- result = Export.query.all()
- return jsonify(exports=[e.as_dict for e in result])
- @app.route("/exports/<int:export_id>", methods=["GET"])
- def get_export(export_id: int):
- """Return an overview for the given Export."""
- e = Export.query.get_or_404(export_id)
- ss = [s.as_dict for s in e.settlements]
- return jsonify(export=e.as_dict, settlements=ss)
- @app.route("/exports", methods=["POST"])
- def add_export():
- """Create an Export, and link all un-exported Settlements to it."""
- # Assert that there are Settlements to be exported.
- s_count = Settlement.query.filter_by(export=None).count()
- if s_count == 0:
- return jsonify(error="No un-exported Settlements."), 403
- e = Export()
- db.session.add(e)
- db.session.commit()
- Settlement.query.filter_by(export=None).update({"export_id": e.export_id})
- db.session.commit()
- ss = [s.as_dict for s in e.settlements]
- return jsonify(export=e.as_dict, settlements=ss), 201
|