from __future__ import annotations import datetime import json import logging import sys from dataclasses import asdict, dataclass from enum import Enum from typing import Any, Dict, List, NewType, Optional, Tuple, Union import requests from piket_server.flask import db from piket_server.models import Person from piket_server.util import fmt_datetime # AARDBEI_ENDPOINT = "https://aardbei.app" AARDBEI_ENDPOINT = "http://localhost:3000" log = logging.getLogger(__name__) ActivityId = NewType("ActivityId", int) PersonId = NewType("PersonId", int) MemberId = NewType("MemberId", int) ParticipantId = NewType("ParticipantId", int) @dataclass(frozen=True) class AardbeiPerson: """ Contains the data on a Person as exposed by Aardbei. A Person represents a person in the real world, and maps to a Person in the local database. """ aardbei_id: PersonId full_name: str @classmethod def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiPerson: """ Load from a dictionary provided by Aardbei. >>> AardbeiPerson.from_aardbei_dict( {"person": {"aardbei_id": 1, "full_name": "Henkie Kraggelwenk"}} ) AardbeiPerson(aardbei_id=AardbeiId(1), full_name="Henkie Kraggelwenk") """ d = data["person"] return cls(full_name=d["full_name"], aardbei_id=PersonId(d["id"])) @property def as_json_dict(self) -> Dict[str, Any]: """ Serialize to a dictionary as provided by Aardbei. >>> AardbeiPerson(aardbei_id=AardbeiId(1), full_name="Henkie Kraggelwenk").as_json_dict {"person": {"id": 1, "full_name": "Henkie Kraggelwenk"}} """ return {"person": {"id": self.aardbei_id, "full_name": self.full_name}} @dataclass(frozen=True) class AardbeiMember: """ Contains the data on a Member exposed by Aardbei. A Member represents the membership of a Person in a Group in Aardbei. """ person: AardbeiPerson aardbei_id: MemberId is_leader: bool display_name: str @classmethod def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiMember: """ Load from a dictionary provided by Aardbei. >>> from_aardbei_dict({ "member": { "person": { "full_name": "Roer Kuggelvork", "id": 2, }, "id": 23, "is_leader": False, "display_name": "Roer", }, }) AardbeiMember( person=AardbeiPerson(aardbei_id=PersonId(2), full_name="Roer Kuggelvork"), aardbei_id=MemberId(23), is_leader=False, display_name="Roer", ) """ d = data["member"] person = AardbeiPerson.from_aardbei_dict(d) return cls( person=person, aardbei_id=MemberId(d["id"]), is_leader=d["is_leader"], display_name=d["display_name"], ) @property def as_json_dict(self) -> Dict[str, Any]: """ Serialize to a dict as provided by Aardbei. >>> AardbeiMember( person=AardbeiPerson(aardbei_id=PersonId(2), full_name="Roer Kuggelvork"), aardbei_id=MemberId(23), is_leader=False, display_name="Roer", ) { "member": { "person": { "full_name": "Roer Kuggelvork", "id": 2, }, "id": 23, "is_leader": False, "display_name": "Roer", } } """ res = { "id": self.aardbei_id, "is_leader": self.is_leader, "display_name": self.display_name, } res.update(self.person.as_json_dict) return res @dataclass(frozen=True) class AardbeiParticipant: """ Represents a Participant as exposed by Aardbei. A Participant represents the participation of a Person (optionally as a Member in a Group) in an Activity. """ person: AardbeiPerson member: Optional[AardbeiMember] aardbei_id: ParticipantId attending: bool is_organizer: bool notes: Optional[str] @property def name(self) -> str: """ Return the name to show for this Participant. This is the display_name if a Member is present, else the Participant's Person's full name. """ if self.member is not None: return self.member.display_name return self.person.full_name @classmethod def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiParticipant: """ Load from a dictionary as provided by Aardbei. """ d = data["participant"] person = AardbeiPerson.from_aardbei_dict(d) member: Optional[AardbeiMember] = None if d["member"] is not None: member = AardbeiMember.from_aardbei_dict(d) aardbei_id = ParticipantId(d["id"]) return cls( person=person, member=member, aardbei_id=aardbei_id, attending=d["attending"], is_organizer=d["is_organizer"], notes=d["notes"], ) @property def as_json_dict(self) -> Dict[str, Any]: """ Serialize to a dict as provided by Aardbei. """ res = { "participant": { "id": self.aardbei_id, "attending": self.attending, "is_organizer": self.is_organizer, "notes": self.notes, } } res.update(self.person.as_json_dict) if self.member is not None: res.update(self.member.as_json_dict) return res class NoResponseAction(Enum): """Represents the "no response action" attribute of Activities in Aardbei.""" Present = "present" Absent = "absent" @dataclass(frozen=True) class ResponseCounts: """Represents the "response counts" attribute of Activities in Aardbei.""" present: int absent: int unknown: int @classmethod def from_aardbei_dict(cls, data: Dict[str, int]) -> ResponseCounts: """Load from a dict as provided by Aardbei.""" return cls( present=data["present"], absent=data["absent"], unknown=data["unknown"] ) @property def as_json_dict(self) -> Dict[str, int]: """Serialize to a dict as provided by Aardbei.""" return {"present": self.present, "absent": self.absent, "unknown": self.unknown} @dataclass(frozen=True) class SparseAardbeiActivity: aardbei_id: ActivityId name: str description: str location: str start: datetime.datetime end: Optional[datetime.datetime] deadline: Optional[datetime.datetime] reminder_at: Optional[datetime.datetime] no_response_action: NoResponseAction response_counts: ResponseCounts def distance(self, reference: datetime.datetime) -> datetime.timedelta: """Calculate how long ago this Activity ended / how much time until it starts.""" if self.end is not None: if reference > self.start and reference < self.end: return datetime.timedelta(seconds=0) elif reference < self.start: return self.start - reference elif reference > self.end: return reference - self.end if reference > self.start: return reference - self.start return self.start - reference @classmethod def from_aardbei_dict(cls, data: Dict[str, Any]) -> SparseAardbeiActivity: """Load from a dict as provided by Aardbei.""" start: datetime.datetime = datetime.datetime.fromisoformat( data["activity"]["start"] ) end: Optional[datetime.datetime] = None if data["activity"]["end"] is not None: end = datetime.datetime.fromisoformat(data["activity"]["end"]) deadline: Optional[datetime.datetime] = None if data["activity"]["deadline"] is not None: deadline = datetime.datetime.fromisoformat(data["activity"]["deadline"]) reminder_at: Optional[datetime.datetime] = None if data["activity"]["reminder_at"] is not None: reminder_at = datetime.datetime.fromisoformat( data["activity"]["reminder_at"] ) no_response_action = NoResponseAction(data["activity"]["no_response_action"]) response_counts = ResponseCounts.from_aardbei_dict( data["activity"]["response_counts"] ) return cls( aardbei_id=ActivityId(data["activity"]["id"]), name=data["activity"]["name"], description=data["activity"]["description"], location=data["activity"]["location"], start=start, end=end, deadline=deadline, reminder_at=reminder_at, no_response_action=no_response_action, response_counts=response_counts, ) @property def as_json_dict(self) -> Dict[str, Any]: """Serialize to a dict as provided by Aardbei.""" return { "activity": { "id": self.aardbei_id, "name": self.name, "description": self.description, "location": self.location, "start": fmt_datetime(self.start), "end": fmt_datetime(self.end), "deadline": fmt_datetime(self.deadline), "reminder_at": fmt_datetime(self.reminder_at), "no_response_action": self.no_response_action.value, "response_counts": self.response_counts.as_json_dict, } } @dataclass(frozen=True) class AardbeiActivity(SparseAardbeiActivity): """Contains the data of an Activity as exposed by Aardbei.""" participants: List[AardbeiParticipant] @classmethod def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiActivity: """Load from a dict as provided by Aardbei.""" # Ugly: This is a copy of the Sparse variant with added participants. # This is not ideal, but I don't care enough to fix this right now. participants: List[AardbeiParticipant] = [ AardbeiParticipant.from_aardbei_dict(x) for x in data["activity"]["participants"] ] start: datetime.datetime = datetime.datetime.fromisoformat( data["activity"]["start"] ) end: Optional[datetime.datetime] = None if data["activity"]["end"] is not None: end = datetime.datetime.fromisoformat(data["activity"]["end"]) deadline: Optional[datetime.datetime] = None if data["activity"]["deadline"] is not None: deadline = datetime.datetime.fromisoformat(data["activity"]["deadline"]) reminder_at: Optional[datetime.datetime] = None if data["activity"]["reminder_at"] is not None: reminder_at = datetime.datetime.fromisoformat( data["activity"]["reminder_at"] ) no_response_action = NoResponseAction(data["activity"]["no_response_action"]) response_counts = ResponseCounts.from_aardbei_dict( data["activity"]["response_counts"] ) return cls( aardbei_id=ActivityId(data["activity"]["id"]), name=data["activity"]["name"], description=data["activity"]["description"], location=data["activity"]["location"], start=start, end=end, deadline=deadline, reminder_at=reminder_at, no_response_action=no_response_action, response_counts=response_counts, participants=participants, ) @property def as_json_dict(self) -> Dict[str, Any]: """Serialize to a dict as provided by Aardbei.""" res = super().as_json_dict res["participants"] = [p.as_json_dict for p in self.participants] return res @dataclass(frozen=True) class AardbeiMatch: """Represents a match between a local Person and a Person present in Aardbei's data.""" local: Person remote: AardbeiMember @dataclass(frozen=True) class AardbeiLink: """Represents a set of differences between the local state and Aardbei's set of people.""" matches: List[AardbeiMatch] """People that exist on both sides, but aren't linked in the people table.""" altered_name: List[AardbeiMatch] """People that are already linked but changed one of their names.""" remote_only: List[AardbeiMember] """People that only exist on the remote.""" @property def num_changes(self) -> int: """Return the amount of mismatching people between Aardbei and the local state.""" return len(self.matches) + len(self.altered_name) + len(self.remote_only) class AardbeiSyncError(Enum): """Represents errors that might occur when retrieving data from Aardbei.""" CantConnect = "connect_fail" HTTPError = "http_fail" def get_aardbei_people( token: str, endpoint: str = AARDBEI_ENDPOINT ) -> Union[List[AardbeiMember], AardbeiSyncError]: """Retrieve the set of People in a Group from Aardbei, and parse this to AardbeiPerson objects. Return a AardbeiSyncError if something fails.""" try: resp: requests.Response = requests.get( f"{endpoint}/api/groups/0/", headers={"Authorization": f"Group {token}"}, ) resp.raise_for_status() except requests.ConnectionError as e: log.exception("Can't connect to endpoint %s", endpoint) return AardbeiSyncError.CantConnect except requests.HTTPError: return AardbeiSyncError.HTTPError members = resp.json()["group"]["members"] return [AardbeiMember.from_aardbei_dict(x) for x in members] def match_local_aardbei(aardbei_members: List[AardbeiMember]) -> AardbeiLink: """Inspect the local state and compare it with the set of given AardbeiMembers (containing AardbeiPersons). Return a AardbeiLink that indicates which local people don't match the remote state.""" matches: List[AardbeiMatch] = [] altered_name: List[AardbeiMatch] = [] remote_only: List[AardbeiMember] = [] for member in aardbei_members: p: Optional[Person] = Person.query.filter_by( aardbei_id=member.person.aardbei_id ).one_or_none() if p is not None: if ( p.full_name != member.person.full_name or p.display_name != member.display_name ): altered_name.append(AardbeiMatch(p, member)) else: logging.info( "OK: %s / %s (L%s/R%s)", p.full_name, p.display_name, p.person_id, p.aardbei_id, ) continue p = Person.query.filter_by(full_name=member.person.full_name).one_or_none() if p is not None: matches.append(AardbeiMatch(p, member)) else: remote_only.append(member) return AardbeiLink(matches, altered_name, remote_only) def link_matches(matches: List[AardbeiMatch]) -> None: """ Update local people to add the remote ID to the local state. This only enqueues the changes in the local SQLAlchemy session, committing needs to be done separately. """ for match in matches: match.local.aardbei_id = match.remote.person.aardbei_id match.local.display_name = match.remote.display_name logging.info( "Linking local %s (%s) to remote %s (%s)", match.local.full_name, match.local.person_id, match.remote.display_name, match.remote.person.aardbei_id, ) db.session.add(match.local) def create_missing(missing: List[AardbeiMember]) -> None: """ Create local people for all remote people that don't exist locally. This only enqueues the changes in the local SQLAlchemy session, committing needs to be done separately. """ for member in missing: pnew = Person( full_name=member.person.full_name, display_name=member.display_name, aardbei_id=member.person.aardbei_id, active=False, ) logging.info( "Creating new person for %s / %s (%s)", member.person.full_name, member.display_name, member.person.aardbei_id, ) db.session.add(pnew) def update_names(matches: List[AardbeiMatch]) -> None: """ Update the local full and display names of people that were already linked to a remote person, and who changed names on the remote. This only enqueues the changes in the local SQLAlchemy session, committing needs to be done separately. """ for match in matches: p = match.local member = match.remote aardbei_person = member.person changed = False if p.full_name != aardbei_person.full_name: logging.info( "Updating %s (L%s/R%s) full name %s to %s", aardbei_person.full_name, p.person_id, aardbei_person.aardbei_id, p.full_name, aardbei_person.full_name, ) p.full_name = aardbei_person.full_name changed = True if p.display_name != member.display_name: logging.info( "Updating %s (L%s/R%s) display name %s to %s", p.full_name, p.person_id, aardbei_person.aardbei_id, p.display_name, member.display_name, ) p.display_name = member.display_name changed = True assert changed, "got match but didn't update anything" db.session.add(p) def get_activities( token: str, endpoint: str = AARDBEI_ENDPOINT ) -> Union[List[SparseAardbeiActivity], AardbeiSyncError]: """ Get the list of activities present on the remote and return these activities, ordered by the temporal distance to the current time. """ result: List[SparseAardbeiActivity] = [] for category in ("upcoming", "current", "previous"): try: resp = requests.get( f"{endpoint}/api/groups/0/{category}_activities", headers={"Authorization": f"Group {token}"}, ) resp.raise_for_status() except requests.HTTPError as e: log.exception(e) return AardbeiSyncError.HTTPError except requests.ConnectionError as e: log.exception(e) return AardbeiSyncError.CantConnect for item in resp.json(): result.append(SparseAardbeiActivity.from_aardbei_dict(item)) now = datetime.datetime.now(datetime.timezone.utc) result.sort(key=lambda x: SparseAardbeiActivity.distance(x, now)) return result def get_activity( activity_id: ActivityId, token: str, endpoint: str ) -> Union[AardbeiActivity, AardbeiSyncError]: """ Get all data (including participants) from the remote about one activity with a given ID. """ try: resp = requests.get( f"{endpoint}/api/activities/{activity_id}", headers={"Authorization": f"Group {token}"}, ) resp.raise_for_status() except requests.HTTPError as e: log.exception(e) return AardbeiSyncError.HTTPError except requests.ConnectionError as e: return AardbeiSyncError.CantConnect return AardbeiActivity.from_aardbei_dict(resp.json()) def match_activity(activity: AardbeiActivity) -> None: """ Update the local state to have mark all people present at the given activity as active, and all other people as inactive. """ ps = activity.participants pids: List[PersonId] = [p.person.aardbei_id for p in ps if p.attending] Person.query.update(values={"active": False}) Person.query.filter(Person.aardbei_id.in_(pids)).update( values={"active": True}, synchronize_session="fetch" ) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) token = input("Token: ") aardbei_people = get_aardbei_people(token) if isinstance(aardbei_people, AardbeiSyncError): logging.error("Could not get people: %s", aardbei_people.value) sys.exit(1) activities = get_activities(token) if isinstance(activities, AardbeiSyncError): logging.error("Could not get activities: %s", activities.value) sys.exit(1) link = match_local_aardbei(aardbei_people) link_matches(link.matches) create_missing(link.remote_only) update_names(link.altered_name) confirm = input("Commit? Y/N") if confirm.lower() == "y": print("Committing.") db.session.commit() else: print("Not committing.")