|
- from __future__ import annotations
- import datetime
- import json
- import logging
- import sys
- from dataclasses import asdict, dataclass
- from enum import Enum
- from typing import Any, Dict, List, NewType, Optional, Tuple, Union
- import requests
- from piket_server.flask import db
- from piket_server.models import Person
- from piket_server.util import fmt_datetime
- # AARDBEI_ENDPOINT = "https://aardbei.app"
- AARDBEI_ENDPOINT = "http://localhost:3000"
- log = logging.getLogger(__name__)
- ActivityId = NewType("ActivityId", int)
- PersonId = NewType("PersonId", int)
- MemberId = NewType("MemberId", int)
- ParticipantId = NewType("ParticipantId", int)
- @dataclass(frozen=True)
- class AardbeiPerson:
- """
- Contains the data on a Person as exposed by Aardbei.
- A Person represents a person in the real world, and maps to a Person in the local database.
- """
- aardbei_id: PersonId
- full_name: str
- @classmethod
- def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiPerson:
- """
- Load from a dictionary provided by Aardbei.
- >>> AardbeiPerson.from_aardbei_dict(
- {"person": {"aardbei_id": 1, "full_name": "Henkie Kraggelwenk"}}
- )
- AardbeiPerson(aardbei_id=AardbeiId(1), full_name="Henkie Kraggelwenk")
- """
- d = data["person"]
- return cls(full_name=d["full_name"], aardbei_id=PersonId(d["id"]))
- @property
- def as_json_dict(self) -> Dict[str, Any]:
- """
- Serialize to a dictionary as provided by Aardbei.
- >>> AardbeiPerson(aardbei_id=AardbeiId(1), full_name="Henkie Kraggelwenk").as_json_dict
- {"person": {"id": 1, "full_name": "Henkie Kraggelwenk"}}
- """
- return {"person": {"id": self.aardbei_id, "full_name": self.full_name}}
- @dataclass(frozen=True)
- class AardbeiMember:
- """
- Contains the data on a Member exposed by Aardbei.
- A Member represents the membership of a Person in a Group in Aardbei.
- """
- person: AardbeiPerson
- aardbei_id: MemberId
- is_leader: bool
- display_name: str
- @classmethod
- def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiMember:
- """
- Load from a dictionary provided by Aardbei.
- >>> from_aardbei_dict({
- "member": {
- "person": {
- "full_name": "Roer Kuggelvork",
- "id": 2,
- },
- "id": 23,
- "is_leader": False,
- "display_name": "Roer",
- },
- })
- AardbeiMember(
- person=AardbeiPerson(aardbei_id=PersonId(2), full_name="Roer Kuggelvork"),
- aardbei_id=MemberId(23),
- is_leader=False,
- display_name="Roer",
- )
- """
- d = data["member"]
- person = AardbeiPerson.from_aardbei_dict(d)
- return cls(
- person=person,
- aardbei_id=MemberId(d["id"]),
- is_leader=d["is_leader"],
- display_name=d["display_name"],
- )
- @property
- def as_json_dict(self) -> Dict[str, Any]:
- """
- Serialize to a dict as provided by Aardbei.
- >>> AardbeiMember(
- person=AardbeiPerson(aardbei_id=PersonId(2), full_name="Roer Kuggelvork"),
- aardbei_id=MemberId(23),
- is_leader=False,
- display_name="Roer",
- )
- {
- "member": {
- "person": {
- "full_name": "Roer Kuggelvork",
- "id": 2,
- },
- "id": 23,
- "is_leader": False,
- "display_name": "Roer",
- }
- }
- """
- res = {
- "id": self.aardbei_id,
- "is_leader": self.is_leader,
- "display_name": self.display_name,
- }
- res.update(self.person.as_json_dict)
- return res
- @dataclass(frozen=True)
- class AardbeiParticipant:
- """
- Represents a Participant as exposed by Aardbei.
- A Participant represents the participation of a Person (optionally as a Member in a Group) in an Activity.
- """
- person: AardbeiPerson
- member: Optional[AardbeiMember]
- aardbei_id: ParticipantId
- attending: bool
- is_organizer: bool
- notes: Optional[str]
- @property
- def name(self) -> str:
- """
- Return the name to show for this Participant.
- This is the display_name if a Member is present, else the Participant's Person's full name.
- """
- if self.member is not None:
- return self.member.display_name
- return self.person.full_name
- @classmethod
- def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiParticipant:
- """
- Load from a dictionary as provided by Aardbei.
- """
- d = data["participant"]
- person = AardbeiPerson.from_aardbei_dict(d)
- member: Optional[AardbeiMember] = None
- if d["member"] is not None:
- member = AardbeiMember.from_aardbei_dict(d)
- aardbei_id = ParticipantId(d["id"])
- return cls(
- person=person,
- member=member,
- aardbei_id=aardbei_id,
- attending=d["attending"],
- is_organizer=d["is_organizer"],
- notes=d["notes"],
- )
- @property
- def as_json_dict(self) -> Dict[str, Any]:
- """
- Serialize to a dict as provided by Aardbei.
- """
- res = {
- "participant": {
- "id": self.aardbei_id,
- "attending": self.attending,
- "is_organizer": self.is_organizer,
- "notes": self.notes,
- }
- }
- res.update(self.person.as_json_dict)
- if self.member is not None:
- res.update(self.member.as_json_dict)
- return res
- class NoResponseAction(Enum):
- """Represents the "no response action" attribute of Activities in Aardbei."""
- Present = "present"
- Absent = "absent"
- @dataclass(frozen=True)
- class ResponseCounts:
- """Represents the "response counts" attribute of Activities in Aardbei."""
- present: int
- absent: int
- unknown: int
- @classmethod
- def from_aardbei_dict(cls, data: Dict[str, int]) -> ResponseCounts:
- """Load from a dict as provided by Aardbei."""
- return cls(
- present=data["present"], absent=data["absent"], unknown=data["unknown"]
- )
- @property
- def as_json_dict(self) -> Dict[str, int]:
- """Serialize to a dict as provided by Aardbei."""
- return {"present": self.present, "absent": self.absent, "unknown": self.unknown}
- @dataclass(frozen=True)
- class SparseAardbeiActivity:
- aardbei_id: ActivityId
- name: str
- description: str
- location: str
- start: datetime.datetime
- end: Optional[datetime.datetime]
- deadline: Optional[datetime.datetime]
- reminder_at: Optional[datetime.datetime]
- no_response_action: NoResponseAction
- response_counts: ResponseCounts
- def distance(self, reference: datetime.datetime) -> datetime.timedelta:
- """Calculate how long ago this Activity ended / how much time until it starts."""
- if self.end is not None:
- if reference > self.start and reference < self.end:
- return datetime.timedelta(seconds=0)
- elif reference < self.start:
- return self.start - reference
- elif reference > self.end:
- return reference - self.end
- if reference > self.start:
- return reference - self.start
- return self.start - reference
- @classmethod
- def from_aardbei_dict(cls, data: Dict[str, Any]) -> SparseAardbeiActivity:
- """Load from a dict as provided by Aardbei."""
- start: datetime.datetime = datetime.datetime.fromisoformat(
- data["activity"]["start"]
- )
- end: Optional[datetime.datetime] = None
- if data["activity"]["end"] is not None:
- end = datetime.datetime.fromisoformat(data["activity"]["end"])
- deadline: Optional[datetime.datetime] = None
- if data["activity"]["deadline"] is not None:
- deadline = datetime.datetime.fromisoformat(data["activity"]["deadline"])
- reminder_at: Optional[datetime.datetime] = None
- if data["activity"]["reminder_at"] is not None:
- reminder_at = datetime.datetime.fromisoformat(
- data["activity"]["reminder_at"]
- )
- no_response_action = NoResponseAction(data["activity"]["no_response_action"])
- response_counts = ResponseCounts.from_aardbei_dict(
- data["activity"]["response_counts"]
- )
- return cls(
- aardbei_id=ActivityId(data["activity"]["id"]),
- name=data["activity"]["name"],
- description=data["activity"]["description"],
- location=data["activity"]["location"],
- start=start,
- end=end,
- deadline=deadline,
- reminder_at=reminder_at,
- no_response_action=no_response_action,
- response_counts=response_counts,
- )
- @property
- def as_json_dict(self) -> Dict[str, Any]:
- """Serialize to a dict as provided by Aardbei."""
- return {
- "activity": {
- "id": self.aardbei_id,
- "name": self.name,
- "description": self.description,
- "location": self.location,
- "start": fmt_datetime(self.start),
- "end": fmt_datetime(self.end),
- "deadline": fmt_datetime(self.deadline),
- "reminder_at": fmt_datetime(self.reminder_at),
- "no_response_action": self.no_response_action.value,
- "response_counts": self.response_counts.as_json_dict,
- }
- }
- @dataclass(frozen=True)
- class AardbeiActivity(SparseAardbeiActivity):
- """Contains the data of an Activity as exposed by Aardbei."""
- participants: List[AardbeiParticipant]
- @classmethod
- def from_aardbei_dict(cls, data: Dict[str, Any]) -> AardbeiActivity:
- """Load from a dict as provided by Aardbei."""
- # Ugly: This is a copy of the Sparse variant with added participants.
- # This is not ideal, but I don't care enough to fix this right now.
- participants: List[AardbeiParticipant] = [
- AardbeiParticipant.from_aardbei_dict(x)
- for x in data["activity"]["participants"]
- ]
- start: datetime.datetime = datetime.datetime.fromisoformat(
- data["activity"]["start"]
- )
- end: Optional[datetime.datetime] = None
- if data["activity"]["end"] is not None:
- end = datetime.datetime.fromisoformat(data["activity"]["end"])
- deadline: Optional[datetime.datetime] = None
- if data["activity"]["deadline"] is not None:
- deadline = datetime.datetime.fromisoformat(data["activity"]["deadline"])
- reminder_at: Optional[datetime.datetime] = None
- if data["activity"]["reminder_at"] is not None:
- reminder_at = datetime.datetime.fromisoformat(
- data["activity"]["reminder_at"]
- )
- no_response_action = NoResponseAction(data["activity"]["no_response_action"])
- response_counts = ResponseCounts.from_aardbei_dict(
- data["activity"]["response_counts"]
- )
- return cls(
- aardbei_id=ActivityId(data["activity"]["id"]),
- name=data["activity"]["name"],
- description=data["activity"]["description"],
- location=data["activity"]["location"],
- start=start,
- end=end,
- deadline=deadline,
- reminder_at=reminder_at,
- no_response_action=no_response_action,
- response_counts=response_counts,
- participants=participants,
- )
- @property
- def as_json_dict(self) -> Dict[str, Any]:
- """Serialize to a dict as provided by Aardbei."""
- res = super().as_json_dict
- res["participants"] = [p.as_json_dict for p in self.participants]
- return res
- @dataclass(frozen=True)
- class AardbeiMatch:
- """Represents a match between a local Person and a Person present in Aardbei's data."""
- local: Person
- remote: AardbeiMember
- @dataclass(frozen=True)
- class AardbeiLink:
- """Represents a set of differences between the local state and Aardbei's set of people."""
- matches: List[AardbeiMatch]
- """People that exist on both sides, but aren't linked in the people table."""
- altered_name: List[AardbeiMatch]
- """People that are already linked but changed one of their names."""
- remote_only: List[AardbeiMember]
- """People that only exist on the remote."""
- @property
- def num_changes(self) -> int:
- """Return the amount of mismatching people between Aardbei and the local state."""
- return len(self.matches) + len(self.altered_name) + len(self.remote_only)
- class AardbeiSyncError(Enum):
- """Represents errors that might occur when retrieving data from Aardbei."""
- CantConnect = "connect_fail"
- HTTPError = "http_fail"
- def get_aardbei_people(
- token: str, endpoint: str = AARDBEI_ENDPOINT
- ) -> Union[List[AardbeiMember], AardbeiSyncError]:
- """Retrieve the set of People in a Group from Aardbei, and parse this to
- AardbeiPerson objects. Return a AardbeiSyncError if something fails."""
- try:
- resp: requests.Response = requests.get(
- f"{endpoint}/api/groups/0/",
- headers={"Authorization": f"Group {token}"},
- )
- resp.raise_for_status()
- except requests.ConnectionError as e:
- log.exception("Can't connect to endpoint %s", endpoint)
- return AardbeiSyncError.CantConnect
- except requests.HTTPError:
- return AardbeiSyncError.HTTPError
- members = resp.json()["group"]["members"]
- return [AardbeiMember.from_aardbei_dict(x) for x in members]
- def match_local_aardbei(aardbei_members: List[AardbeiMember]) -> AardbeiLink:
- """Inspect the local state and compare it with the set of given
- AardbeiMembers (containing AardbeiPersons). Return a AardbeiLink that
- indicates which local people don't match the remote state."""
- matches: List[AardbeiMatch] = []
- altered_name: List[AardbeiMatch] = []
- remote_only: List[AardbeiMember] = []
- for member in aardbei_members:
- p: Optional[Person] = Person.query.filter_by(
- aardbei_id=member.person.aardbei_id
- ).one_or_none()
- if p is not None:
- if (
- p.full_name != member.person.full_name
- or p.display_name != member.display_name
- ):
- altered_name.append(AardbeiMatch(p, member))
- else:
- logging.info(
- "OK: %s / %s (L%s/R%s)",
- p.full_name,
- p.display_name,
- p.person_id,
- p.aardbei_id,
- )
- continue
- p = Person.query.filter_by(full_name=member.person.full_name).one_or_none()
- if p is not None:
- matches.append(AardbeiMatch(p, member))
- else:
- remote_only.append(member)
- return AardbeiLink(matches, altered_name, remote_only)
- def link_matches(matches: List[AardbeiMatch]) -> None:
- """
- Update local people to add the remote ID to the local state.
- This only enqueues the changes in the local SQLAlchemy session, committing
- needs to be done separately.
- """
- for match in matches:
- match.local.aardbei_id = match.remote.person.aardbei_id
- match.local.display_name = match.remote.display_name
- logging.info(
- "Linking local %s (%s) to remote %s (%s)",
- match.local.full_name,
- match.local.person_id,
- match.remote.display_name,
- match.remote.person.aardbei_id,
- )
- db.session.add(match.local)
- def create_missing(missing: List[AardbeiMember]) -> None:
- """
- Create local people for all remote people that don't exist locally.
- This only enqueues the changes in the local SQLAlchemy session, committing
- needs to be done separately.
- """
- for member in missing:
- pnew = Person(
- full_name=member.person.full_name,
- display_name=member.display_name,
- aardbei_id=member.person.aardbei_id,
- active=False,
- )
- logging.info(
- "Creating new person for %s / %s (%s)",
- member.person.full_name,
- member.display_name,
- member.person.aardbei_id,
- )
- db.session.add(pnew)
- def update_names(matches: List[AardbeiMatch]) -> None:
- """
- Update the local full and display names of people that were already linked
- to a remote person, and who changed names on the remote.
- This only enqueues the changes in the local SQLAlchemy session, committing
- needs to be done separately.
- """
- for match in matches:
- p = match.local
- member = match.remote
- aardbei_person = member.person
- changed = False
- if p.full_name != aardbei_person.full_name:
- logging.info(
- "Updating %s (L%s/R%s) full name %s to %s",
- aardbei_person.full_name,
- p.person_id,
- aardbei_person.aardbei_id,
- p.full_name,
- aardbei_person.full_name,
- )
- p.full_name = aardbei_person.full_name
- changed = True
- if p.display_name != member.display_name:
- logging.info(
- "Updating %s (L%s/R%s) display name %s to %s",
- p.full_name,
- p.person_id,
- aardbei_person.aardbei_id,
- p.display_name,
- member.display_name,
- )
- p.display_name = member.display_name
- changed = True
- assert changed, "got match but didn't update anything"
- db.session.add(p)
- def get_activities(
- token: str, endpoint: str = AARDBEI_ENDPOINT
- ) -> Union[List[SparseAardbeiActivity], AardbeiSyncError]:
- """
- Get the list of activities present on the remote and return these
- activities, ordered by the temporal distance to the current time.
- """
- result: List[SparseAardbeiActivity] = []
- for category in ("upcoming", "current", "previous"):
- try:
- resp = requests.get(
- f"{endpoint}/api/groups/0/{category}_activities",
- headers={"Authorization": f"Group {token}"},
- )
- resp.raise_for_status()
- except requests.HTTPError as e:
- log.exception(e)
- return AardbeiSyncError.HTTPError
- except requests.ConnectionError as e:
- log.exception(e)
- return AardbeiSyncError.CantConnect
- for item in resp.json():
- result.append(SparseAardbeiActivity.from_aardbei_dict(item))
- now = datetime.datetime.now(datetime.timezone.utc)
- result.sort(key=lambda x: SparseAardbeiActivity.distance(x, now))
- return result
- def get_activity(
- activity_id: ActivityId, token: str, endpoint: str
- ) -> Union[AardbeiActivity, AardbeiSyncError]:
- """
- Get all data (including participants) from the remote about one activity
- with a given ID.
- """
- try:
- resp = requests.get(
- f"{endpoint}/api/activities/{activity_id}",
- headers={"Authorization": f"Group {token}"},
- )
- resp.raise_for_status()
- except requests.HTTPError as e:
- log.exception(e)
- return AardbeiSyncError.HTTPError
- except requests.ConnectionError as e:
- return AardbeiSyncError.CantConnect
- return AardbeiActivity.from_aardbei_dict(resp.json())
- def match_activity(activity: AardbeiActivity) -> None:
- """
- Update the local state to have mark all people present at the given
- activity as active, and all other people as inactive.
- """
- ps = activity.participants
- pids: List[PersonId] = [p.person.aardbei_id for p in ps if p.attending]
- Person.query.update(values={"active": False})
- Person.query.filter(Person.aardbei_id.in_(pids)).update(
- values={"active": True}, synchronize_session="fetch"
- )
- if __name__ == "__main__":
- logging.basicConfig(level=logging.DEBUG)
- token = input("Token: ")
- aardbei_people = get_aardbei_people(token)
- if isinstance(aardbei_people, AardbeiSyncError):
- logging.error("Could not get people: %s", aardbei_people.value)
- sys.exit(1)
- activities = get_activities(token)
- if isinstance(activities, AardbeiSyncError):
- logging.error("Could not get activities: %s", activities.value)
- sys.exit(1)
- link = match_local_aardbei(aardbei_people)
- link_matches(link.matches)
- create_missing(link.remote_only)
- update_names(link.altered_name)
- confirm = input("Commit? Y/N")
- if confirm.lower() == "y":
- print("Committing.")
- db.session.commit()
- else:
- print("Not committing.")
|