# -*- encoding: utf-8 -*-
"""
A "DAV object" is anything we get from the caldav server or push into the
caldav server, notably principal, calendars and calendar events.
(This file has become huge and will be split up prior to the next
release. I think it makes sense moving the CalendarObjectResource
class hierarchy into a separate file)
"""
import re
import uuid
from collections import defaultdict
from datetime import date
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import icalendar
import vobject
from caldav.lib.python_utilities import to_normal_str
from caldav.lib.python_utilities import to_unicode
from caldav.lib.python_utilities import to_wire
from dateutil.rrule import rrulestr
from lxml import etree
try:
# noinspection PyCompatibility
from urllib.parse import unquote, quote
except ImportError:
from urllib import unquote, quote
try:
from typing import ClassVar, Union, Optional
TimeStamp = Optional[Union[date, datetime]]
except:
pass
from caldav.lib import error, vcal
from caldav.lib.url import URL
from caldav.elements import dav, cdav, ical
import logging
log = logging.getLogger("caldav")
def errmsg(r):
"""Utility for formatting a response xml tree to an error string"""
return "%s %s\n\n%s" % (r.status, r.reason, r.raw)
class DAVObject(object):
"""
Base class for all DAV objects. Can be instantiated by a client
and an absolute or relative URL, or from the parent object.
"""
id = None
url = None
client = None
parent = None
name = None
def __init__(
self,
client=None,
url=None,
parent=None,
name=None,
id=None,
props=None,
**extra,
):
"""
Default constructor.
Parameters:
* client: A DAVClient instance
* url: The url for this object. May be a full URL or a relative URL.
* parent: The parent object - used when creating objects
* name: A displayname - to be removed in 1.0, see https://github.com/python-caldav/caldav/issues/128 for details
* props: a dict with known properties for this object (as of 2020-12, only used for etags, and only when fetching CalendarObjectResource using the .objects or .objects_by_sync_token methods).
* id: The resource id (UID for an Event)
"""
if client is None and parent is not None:
client = parent.client
self.client = client
self.parent = parent
self.name = name
self.id = id
if props is None:
self.props = {}
else:
self.props = props
self.extra_init_options = extra
# url may be a path relative to the caldav root
if client and url:
self.url = client.url.join(url)
else:
self.url = URL.objectify(url)
@property
def canonical_url(self):
return str(self.url.canonical())
def children(self, type=None):
"""List children, using a propfind (resourcetype) on the parent object,
at depth = 1.
TODO: This is old code, it's querying for DisplayName and
ResourceTypes prop and returning a tuple of those. Those two
are relatively arbitrary. I think it's mostly only calendars
having DisplayName, but it may make sense to ask for the
children of a calendar also as an alternative way to get all
events? It should be redone into a more generic method, and
it should probably return a dict rather than a tuple. We
should also look over to see if there is any code duplication.
"""
c = []
depth = 1
properties = {}
props = [dav.DisplayName()]
multiprops = [dav.ResourceType()]
response = self._query_properties(props + multiprops, depth)
properties = response.expand_simple_props(
props=props, multi_value_props=multiprops
)
for path in list(properties.keys()):
resource_types = properties[path][dav.ResourceType.tag]
resource_name = properties[path][dav.DisplayName.tag]
if type is None or type in resource_types:
url = URL(path)
if url.hostname is None:
# Quote when path is not a full URL
path = quote(path)
# TODO: investigate the RFCs thoroughly - why does a "get
# members of this collection"-request also return the
# collection URL itself?
# And why is the strip_trailing_slash-method needed?
# The collection URL should always end with a slash according
# to RFC 2518, section 5.2.
if (isinstance(self, CalendarSet) and type == cdav.Calendar.tag) or (
self.url.canonical().strip_trailing_slash()
!= self.url.join(path).canonical().strip_trailing_slash()
):
c.append((self.url.join(path), resource_types, resource_name))
## TODO: return objects rather than just URLs, and include
## the properties we've already fetched
return c
def _query_properties(self, props=None, depth=0):
"""
This is an internal method for doing a propfind query. It's a
result of code-refactoring work, attempting to consolidate
similar-looking code into a common method.
"""
root = None
# build the propfind request
if props is not None and len(props) > 0:
prop = dav.Prop() + props
root = dav.Propfind() + prop
return self._query(root, depth)
def _query(
self,
root=None,
depth=0,
query_method="propfind",
url=None,
expected_return_value=None,
):
"""
This is an internal method for doing a query. It's a
result of code-refactoring work, attempting to consolidate
similar-looking code into a common method.
"""
body = ""
if root:
if hasattr(root, "xmlelement"):
body = etree.tostring(
root.xmlelement(), encoding="utf-8", xml_declaration=True
)
else:
body = root
if url is None:
url = self.url
ret = getattr(self.client, query_method)(url, body, depth)
if ret.status == 404:
raise error.NotFoundError(errmsg(ret))
if (
expected_return_value is not None and ret.status != expected_return_value
) or ret.status >= 400:
## COMPATIBILITY HACK - see https://github.com/python-caldav/caldav/issues/309
body = to_wire(body)
if (
ret.status == 500
and not b"getetag" in body
and b"" in body
):
body = body.replace(
b"", b""
)
return self._query(
body, depth, query_method, url, expected_return_value
)
raise error.exception_by_method[query_method](errmsg(ret))
return ret
def get_property(self, prop, use_cached=False, **passthrough):
## TODO: use_cached should probably be true
if use_cached:
if prop.tag in self.props:
return self.props[prop.tag]
foo = self.get_properties([prop], **passthrough)
return foo.get(prop.tag, None)
def get_properties(
self, props=None, depth=0, parse_response_xml=True, parse_props=True
):
"""Get properties (PROPFIND) for this object.
With parse_response_xml and parse_props set to True a
best-attempt will be done on decoding the XML we get from the
server - but this works only for properties that don't have
complex types. With parse_response_xml set to False, a
DAVResponse object will be returned, and it's up to the caller
to decode. With parse_props set to false but
parse_response_xml set to true, xml elements will be returned
rather than values.
Parameters:
* props = [dav.ResourceType(), dav.DisplayName(), ...]
Returns:
* {proptag: value, ...}
"""
rc = None
response = self._query_properties(props, depth)
if not parse_response_xml:
return response
if not parse_props:
properties = response.find_objects_and_props()
else:
properties = response.expand_simple_props(props)
error.assert_(properties)
path = unquote(self.url.path)
if path.endswith("/"):
exchange_path = path[:-1]
else:
exchange_path = path + "/"
if path in properties:
rc = properties[path]
elif exchange_path in properties:
if not isinstance(self, Principal):
## Some caldav servers reports the URL for the current
## principal to end with / when doing a propfind for
## current-user-principal - I believe that's a bug,
## the principal is not a collection and should not
## end with /. (example in rfc5397 does not end with /).
## ... but it gets worse ... when doing a propfind on the
## principal, the href returned may be without the slash.
## Such inconsistency is clearly a bug.
log.error(
"potential path handling problem with ending slashes. Path given: %s, path found: %s. %s"
% (path, exchange_path, error.ERR_FRAGMENT)
)
error._assert(False)
rc = properties[exchange_path]
elif self.url in properties:
rc = properties[self.url]
elif "/principal/" in properties and path.endswith("/principal/"):
## Workaround for a known iCloud bug.
## The properties key is expected to be the same as the path.
## path is on the format /123456/principal/ but properties key is /principal/
## tests apparently passed post bc589093a34f0ed0ef489ad5e9cba048750c9837 and 3ee4e42e2fa8f78b71e5ffd1ef322e4007df7a60, even without this workaround
## TODO: should probably be investigated more.
## (observed also by others, ref https://github.com/python-caldav/caldav/issues/168)
rc = properties["/principal/"]
elif "//" in path and path.replace("//", "/") in properties:
## ref https://github.com/python-caldav/caldav/issues/302
## though, it would be nice to find the root cause,
## self.url should not contain double slashes in the first place
rc = properties[path.replace("//", "/")]
elif len(properties) == 1:
## Ref https://github.com/python-caldav/caldav/issues/191 ...
## let's be pragmatic and just accept whatever the server is
## throwing at us. But we'll log an error anyway.
log.error(
"Possibly the server has a path handling problem, possibly the URL configured is wrong.\n"
"Path expected: %s, path found: %s %s.\n"
"Continuing, probably everything will be fine"
% (path, str(list(properties.keys())), error.ERR_FRAGMENT)
)
rc = list(properties.values())[0]
else:
log.error(
"Possibly the server has a path handling problem. Path expected: %s, paths found: %s %s"
% (path, str(list(properties.keys())), error.ERR_FRAGMENT)
)
error.assert_(False)
if parse_props:
self.props.update(rc)
return rc
def set_properties(self, props=None):
"""
Set properties (PROPPATCH) for this object.
* props = [dav.DisplayName('name'), ...]
Returns:
* self
"""
props = [] if props is None else props
prop = dav.Prop() + props
set = dav.Set() + prop
root = dav.PropertyUpdate() + set
r = self._query(root, query_method="proppatch")
statuses = r.tree.findall(".//" + dav.Status.tag)
for s in statuses:
if " 200 " not in s.text:
raise error.PropsetError(s.text)
return self
def save(self):
"""
Save the object. This is an abstract method, that all classes
derived from DAVObject implement.
Returns:
* self
"""
raise NotImplementedError()
def delete(self):
"""
Delete the object.
"""
if self.url is not None:
r = self.client.delete(self.url)
# TODO: find out why we get 404
if r.status not in (200, 204, 404):
raise error.DeleteError(errmsg(r))
def get_display_name(self):
"""
Get calendar display name
"""
return self.get_property(dav.DisplayName())
def __str__(self):
try:
return (
str(self.get_property(dav.DisplayName(), use_cached=True)) or self.url
)
except:
return str(self.url)
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, self.url)
class CalendarSet(DAVObject):
"""
A CalendarSet is a set of calendars.
"""
def calendars(self):
"""
List all calendar collections in this set.
Returns:
* [Calendar(), ...]
"""
cals = []
data = self.children(cdav.Calendar.tag)
for c_url, c_type, c_name in data:
try:
cal_id = c_url.split("/")[-2]
except:
log.error(f"Calendar {c_name} has unexpected url {c_url}")
cal_id = None
cals.append(
Calendar(self.client, id=cal_id, url=c_url, parent=self, name=c_name)
)
return cals
def make_calendar(
self, name=None, cal_id=None, supported_calendar_component_set=None
):
"""
Utility method for creating a new calendar.
Parameters:
* name: the display name of the new calendar
* cal_id: the uuid of the new calendar
* supported_calendar_component_set: what kind of objects
(EVENT, VTODO, VFREEBUSY, VJOURNAL) the calendar should handle.
Should be set to ['VTODO'] when creating a task list in Zimbra -
in most other cases the default will be OK.
Returns:
* Calendar(...)-object
"""
return Calendar(
self.client,
name=name,
parent=self,
id=cal_id,
supported_calendar_component_set=supported_calendar_component_set,
).save()
def calendar(self, name=None, cal_id=None):
"""
The calendar method will return a calendar object. If it gets a cal_id
but no name, it will not initiate any communication with the server
Parameters:
* name: return the calendar with this display name
* cal_id: return the calendar with this calendar id or URL
Returns:
* Calendar(...)-object
"""
if name and not cal_id:
for calendar in self.calendars():
display_name = calendar.get_display_name()
if display_name == name:
return calendar
if name and not cal_id:
raise error.NotFoundError(
"No calendar with name %s found under %s" % (name, self.url)
)
if not cal_id and not name:
return self.calendars()[0]
if str(URL.objectify(cal_id).canonical()).startswith(
str(self.client.url.canonical())
):
url = self.client.url.join(cal_id)
elif (
isinstance(cal_id, URL)
or cal_id.startswith("https://")
or cal_id.startswith("http://")
):
url = self.url.join(cal_id)
else:
url = self.url.join(quote(cal_id) + "/")
return Calendar(self.client, name=name, parent=self, url=url, id=cal_id)
class Principal(DAVObject):
"""
This class represents a DAV Principal. It doesn't do much, except
keep track of the URLs for the calendar-home-set, etc.
A principal MUST have a non-empty DAV:displayname property
(defined in Section 13.2 of [RFC2518]),
and a DAV:resourcetype property (defined in Section 13.9 of [RFC2518]).
Additionally, a principal MUST report the DAV:principal XML element
in the value of the DAV:resourcetype property.
(TODO: the resourcetype is actually never checked, and the DisplayName
is not stored anywhere)
"""
def __init__(self, client=None, url=None):
"""
Returns a Principal.
Parameters:
* client: a DAVClient() object
* url: Deprecated - for backwards compatibility purposes only.
If url is not given, deduct principal path as well as calendar home set
path from doing propfinds.
"""
super(Principal, self).__init__(client=client, url=url)
self._calendar_home_set = None
if url is None:
self.url = self.client.url
cup = self.get_property(dav.CurrentUserPrincipal())
self.url = self.client.url.join(URL.objectify(cup))
def make_calendar(
self, name=None, cal_id=None, supported_calendar_component_set=None
):
"""
Convenience method, bypasses the self.calendar_home_set object.
See CalendarSet.make_calendar for details.
"""
return self.calendar_home_set.make_calendar(
name,
cal_id,
supported_calendar_component_set=supported_calendar_component_set,
)
def calendar(self, name=None, cal_id=None, cal_url=None):
"""
The calendar method will return a calendar object.
It will not initiate any communication with the server.
"""
if not cal_url:
return self.calendar_home_set.calendar(name, cal_id)
else:
return Calendar(self.client, url=self.client.url.join(cal_url))
def get_vcal_address(self):
"""
Returns the principal, as an icalendar.vCalAddress object
"""
from icalendar import vCalAddress, vText
cn = self.get_display_name()
ids = self.calendar_user_address_set()
cutype = self.get_property(cdav.CalendarUserType())
ret = vCalAddress(ids[0])
ret.params["cn"] = vText(cn)
ret.params["cutype"] = vText(cutype)
return ret
@property
def calendar_home_set(self):
if not self._calendar_home_set:
calendar_home_set_url = self.get_property(cdav.CalendarHomeSet())
## owncloud returns /remote.php/dav/calendars/tobixen@e.email/
## in that case the @ should be quoted. Perhaps other
## implementations return already quoted URLs. Hacky workaround:
if (
calendar_home_set_url is not None
and "@" in calendar_home_set_url
and not "://" in calendar_home_set_url
):
calendar_home_set_url = quote(calendar_home_set_url)
self.calendar_home_set = calendar_home_set_url
return self._calendar_home_set
@calendar_home_set.setter
def calendar_home_set(self, url):
if isinstance(url, CalendarSet):
self._calendar_home_set = url
return
sanitized_url = URL.objectify(url)
## TODO: sanitized_url should never be None, this needs more
## research. added here as it solves real-world issues, ref
## https://github.com/python-caldav/caldav/pull/56
if sanitized_url is not None:
if (
sanitized_url.hostname
and sanitized_url.hostname != self.client.url.hostname
):
# icloud (and others?) having a load balanced system,
# where each principal resides on one named host
## TODO:
## Here be dragons. sanitized_url will be the root
## of all future objects derived from client. Changing
## the client.url root by doing a principal.calendars()
## is an unacceptable side effect and may be a cause of
## incompatibilities with icloud. Do more research!
self.client.url = sanitized_url
self._calendar_home_set = CalendarSet(
self.client, self.client.url.join(sanitized_url)
)
def calendars(self):
"""
Return the principials calendars
"""
return self.calendar_home_set.calendars()
def freebusy_request(self, dtstart, dtend, attendees):
freebusy_ical = icalendar.Calendar()
freebusy_ical.add("prodid", "-//tobixen/python-caldav//EN")
freebusy_ical.add("version", "2.0")
freebusy_ical.add("method", "REQUEST")
uid = uuid.uuid1()
freebusy_comp = icalendar.FreeBusy()
freebusy_comp.add("uid", uid)
freebusy_comp.add("dtstamp", datetime.now())
freebusy_comp.add("dtstart", dtstart)
freebusy_comp.add("dtend", dtend)
freebusy_ical.add_component(freebusy_comp)
outbox = self.schedule_outbox()
caldavobj = FreeBusy(data=freebusy_ical, parent=outbox)
caldavobj.add_organizer()
for attendee in attendees:
caldavobj.add_attendee(attendee, no_default_parameters=True)
response = self.client.post(
outbox.url,
caldavobj.data,
headers={"Content-Type": "text/calendar; charset=utf-8"},
)
return response.find_objects_and_props()
def calendar_user_address_set(self):
"""
defined in RFC6638
"""
addresses = self.get_property(cdav.CalendarUserAddressSet(), parse_props=False)
assert not [x for x in addresses if x.tag != dav.Href().tag]
addresses = list(addresses)
## possibly the preferred attribute is iCloud-specific.
## TODO: do more research on that
addresses.sort(key=lambda x: -int(x.get("preferred", 0)))
return [x.text for x in addresses]
def schedule_inbox(self):
return ScheduleInbox(principal=self)
def schedule_outbox(self):
return ScheduleOutbox(principal=self)
class Calendar(DAVObject):
"""
The `Calendar` object is used to represent a calendar collection.
Refer to the RFC for details:
https://tools.ietf.org/html/rfc4791#section-5.3.1
"""
def _create(self, name=None, id=None, supported_calendar_component_set=None):
"""
Create a new calendar with display name `name` in `parent`.
"""
if id is None:
id = str(uuid.uuid1())
self.id = id
path = self.parent.url.join(id + "/")
self.url = path
# TODO: mkcalendar seems to ignore the body on most servers?
# at least the name doesn't get set this way.
# zimbra gives 500 (!) if body is omitted ...
prop = dav.Prop()
if name:
display_name = dav.DisplayName(name)
prop += [
display_name,
]
if supported_calendar_component_set:
sccs = cdav.SupportedCalendarComponentSet()
for scc in supported_calendar_component_set:
sccs += cdav.Comp(scc)
prop += sccs
set = dav.Set() + prop
mkcol = cdav.Mkcalendar() + set
r = self._query(
root=mkcol, query_method="mkcalendar", url=path, expected_return_value=201
)
# COMPATIBILITY ISSUE
# name should already be set, but we've seen caldav servers failing
# on setting the DisplayName on calendar creation
# (DAViCal, Zimbra, ...). Doing an attempt on explicitly setting the
# display name using PROPPATCH.
if name:
try:
self.set_properties([display_name])
except:
## TODO: investigate. Those asserts break.
error.assert_(False)
try:
current_display_name = self.get_display_name()
error.assert_(current_display_name == name)
except:
log.warning(
"calendar server does not support display name on calendar? Ignoring",
exc_info=True,
)
error.assert_(False)
def get_supported_components(self):
"""
returns a list of component types supported by the calendar, in
string format (typically ['VJOURNAL', 'VTODO', 'VEVENT'])
"""
props = [cdav.SupportedCalendarComponentSet()]
response = self.get_properties(props, parse_response_xml=False)
response_list = response.find_objects_and_props()
prop = response_list[unquote(self.url.path)][
cdav.SupportedCalendarComponentSet().tag
]
return [supported.get("name") for supported in prop]
def save_with_invites(self, ical, attendees, **attendeeoptions):
"""
sends a schedule request to the server. Equivalent with save_event, save_todo, etc,
but the attendees will be added to the ical object before sending it to the server.
"""
## TODO: consolidate together with save_*
obj = self._calendar_comp_class_by_data(ical)(data=ical, client=self.client)
obj.parent = self
obj.add_organizer()
for attendee in attendees:
obj.add_attendee(attendee, **attendeeoptions)
obj.id = obj.icalendar_instance.walk("vevent")[0]["uid"]
obj.save()
return obj
def _use_or_create_ics(self, ical, objtype, **ical_data):
if ical_data or (
(isinstance(ical, str) or isinstance(ical, bytes))
and not b"BEGIN:VCALENDAR" in to_wire(ical)
):
## TODO: the ical_fragment code is not much tested
if ical and not "ical_fragment" in ical_data:
ical_data["ical_fragment"] = ical
return vcal.create_ical(objtype=objtype, **ical_data)
return ical
## TODO: consolidate save_* - too much code duplication here
def save_event(self, ical=None, no_overwrite=False, no_create=False, **ical_data):
"""
Add a new event to the calendar, with the given ical.
Parameters:
* ical - ical object (text)
* no_overwrite - existing calendar objects should not be overwritten
* no_create - don't create a new object, existing calendar objects should be updated
* ical_data - passed to lib.vcal.create_ical
"""
e = Event(
self.client,
data=self._use_or_create_ics(ical, objtype="VEVENT", **ical_data),
parent=self,
)
e.save(no_overwrite=no_overwrite, no_create=no_create, obj_type="event")
self._handle_relations(e.id, ical_data)
return e
def save_todo(self, ical=None, no_overwrite=False, no_create=False, **ical_data):
"""
Add a new task to the calendar, with the given ical.
Parameters:
* ical - ical object (text)
"""
t = Todo(
self.client,
data=self._use_or_create_ics(ical, objtype="VTODO", **ical_data),
parent=self,
)
t.save(no_overwrite=no_overwrite, no_create=no_create, obj_type="todo")
self._handle_relations(t.id, ical_data)
return t
def save_journal(self, ical=None, no_overwrite=False, no_create=False, **ical_data):
"""
Add a new journal entry to the calendar, with the given ical.
Parameters:
* ical - ical object (text)
"""
j = Journal(
self.client,
data=self._use_or_create_ics(ical, objtype="VJOURNAL", **ical_data),
parent=self,
)
j.save(no_overwrite=no_overwrite, no_create=no_create, obj_type="journal")
self._handle_relations(j.id, ical_data)
return j
def _handle_relations(self, uid, ical_data):
for reverse_reltype, other_uid in [
("parent", x) for x in ical_data.get("child", ())
] + [("child", x) for x in ical_data.get("parent", ())]:
other = self.object_by_uid(other_uid)
other.set_relation(other=uid, reltype=reverse_reltype, set_reverse=False)
## legacy aliases
## TODO: should be deprecated
## TODO: think more through this - is `save_foo` better than `add_foo`?
## `save_foo` should not be used for updating existing content on the
## calendar!
add_event = save_event
add_todo = save_todo
add_journal = save_journal
def save(self):
"""
The save method for a calendar is only used to create it, for now.
We know we have to create it when we don't have a url.
Returns:
* self
"""
if self.url is None:
self._create(id=self.id, name=self.name, **self.extra_init_options)
return self
def calendar_multiget(self, event_urls):
"""
get multiple events' data
@author mtorange@gmail.com
@type events list of Event
"""
rv = []
prop = dav.Prop() + cdav.CalendarData()
root = (
cdav.CalendarMultiGet()
+ prop
+ [dav.Href(value=u.path) for u in event_urls]
)
response = self._query(root, 1, "report")
results = response.expand_simple_props([cdav.CalendarData()])
for r in results:
rv.append(
Event(
self.client,
url=self.url.join(r),
data=results[r][cdav.CalendarData.tag],
parent=self,
)
)
return rv
## TODO: Upgrade the warning to an error (and perhaps critical) in future
## releases, and then finally remove this method completely.
def build_date_search_query(
self, start, end=None, compfilter="VEVENT", expand="maybe"
):
"""
WARNING: DEPRECATED
"""
## This is dead code. It has no tests. It was made for usage
## by the date_search method, but I've decided not to use it
## there anymore. Most likely nobody is using this, as it's
## sort of an internal method - but for the sake of backward
## compatibility I will keep it for a while. I regret naming
## it build_date_search_query rather than
## _build_date_search_query...
logging.warning(
"DEPRECATION WARNING: The calendar.build_date_search_query method will be removed in caldav library from version 1.0 or perhaps earlier. Use calendar.build_search_xml_query instead."
)
if expand == "maybe":
expand = end
if compfilter == "VEVENT":
comp_class = Event
elif compfilter == "VTODO":
comp_class = Todo
else:
comp_class = None
return self.build_search_xml_query(
comp_class=comp_class, expand=expand, start=start, end=end
)
def date_search(
self, start, end=None, compfilter="VEVENT", expand="maybe", verify_expand=False
):
# type (TimeStamp, TimeStamp, str, str) -> CalendarObjectResource
"""Deprecated. Use self.search() instead.
Search events by date in the calendar. Recurring events are
expanded if they are occurring during the specified time frame
and if an end timestamp is given.
Parameters:
* start = datetime.today().
* end = same as above.
* compfilter = defaults to events only. Set to None to fetch all
calendar components.
* expand - should recurrent events be expanded? (to preserve
backward-compatibility the default "maybe" will be changed into True
unless the date_search is open-ended)
* verify_expand - not in use anymore, but kept for backward compatibility
Returns:
* [CalendarObjectResource(), ...]
"""
## TODO: upgrade to warning and error before removing this method
logging.info(
"DEPRECATION NOTICE: The calendar.date_search method may be removed in release 2.0 of the caldav library. Use calendar.search instead"
)
if verify_expand:
logging.warning(
"verify_expand in date_search does not work anymore, as we're doing client side expansion instead"
)
## for backward compatibility - expand should be false
## in an open-ended date search, otherwise true
if expand == "maybe":
expand = end
if compfilter == "VEVENT":
comp_class = Event
elif compfilter == "VTODO":
comp_class = Todo
else:
comp_class = None
## xandikos now yields a 5xx-error when trying to pass
## expand=True, after I prodded the developer that it doesn't
## work. By now there is some workaround in the test code to
## avoid sending expand=True to xandikos, but perhaps we
## should run a try-except-retry here with expand=False in the
## retry, and warnings logged ... or perhaps not.
objects = self.search(
start=start,
end=end,
comp_class=comp_class,
expand=expand,
split_expanded=False,
)
return objects
def _request_report_build_resultlist(
self, xml, comp_class=None, props=None, no_calendardata=False
):
"""
Takes some input XML, does a report query on a calendar object
and returns the resource objects found.
TODO: similar code is duplicated many places, we ought to do even more code
refactoring
"""
matches = []
if props is None:
props_ = [cdav.CalendarData()]
else:
props_ = [cdav.CalendarData()] + props
response = self._query(xml, 1, "report")
results = response.expand_simple_props(props_)
for r in results:
pdata = results[r]
if cdav.CalendarData.tag in pdata:
cdata = pdata.pop(cdav.CalendarData.tag)
if comp_class is None:
comp_class = self._calendar_comp_class_by_data(cdata)
else:
cdata = None
if comp_class is None:
## no CalendarData fetched - which is normal i.e. when doing a sync-token report and only asking for the URLs
comp_class = CalendarObjectResource
url = URL(r)
if url.hostname is None:
# Quote when result is not a full URL
url = quote(r)
## icloud hack - icloud returns the calendar URL as well as the calendar item URLs
if self.url.join(url) == self.url:
continue
matches.append(
comp_class(
self.client,
url=self.url.join(url),
data=cdata,
parent=self,
props=pdata,
)
)
return (response, matches)
def search(
self,
xml=None,
comp_class=None,
todo=None,
include_completed=False,
sort_keys=(),
split_expanded=True,
props=None,
**kwargs,
):
"""Creates an XML query, does a REPORT request towards the
server and returns objects found, eventually sorting them
before delivery.
This method contains some special logics to ensure that it can
consistently return a list of pending tasks on any server
implementation. In the future it may also include workarounds
and client side filtering to make sure other search results
are consistent on different server implementations.
Parameters supported:
* xml - use this search query, and ignore other filter parameters
* comp_class - set to event, todo or journal to restrict search to this
resource type. Some server implementations require this to be set.
* todo - sets comp_class to Todo, and restricts search to pending tasks,
unless the next parameter is set ...
* include_completed - include completed tasks
* event - sets comp_class to event
* text attribute search parameters: category, uid, summary, omment,
description, location, status
* no-category, no-summary, etc ... search for objects that does not
have those attributes. TODO: WRITE TEST CODE!
* expand - do server side expanding of recurring events/tasks
* start, end: do a time range search
* filters - other kind of filters (in lxml tree format)
* sort_keys - list of attributes to use when sorting
not supported yet:
* negated text match
* attribute not set
"""
## special compatibility-case when searching for pending todos
if todo and not include_completed:
matches1 = self.search(
todo=True,
comp_class=comp_class,
ignore_completed1=True,
include_completed=True,
**kwargs,
)
matches2 = self.search(
todo=True,
comp_class=comp_class,
ignore_completed2=True,
include_completed=True,
**kwargs,
)
matches3 = self.search(
todo=True,
comp_class=comp_class,
ignore_completed3=True,
include_completed=True,
**kwargs,
)
objects = []
match_set = set()
for item in matches1 + matches2 + matches3:
if not item.url in match_set:
match_set.add(item.url)
## and still, Zimbra seems to deliver too many TODOs in the
## matches2 ... let's do some post-filtering in case the
## server fails in filtering things the right way
if "STATUS:NEEDS-ACTION" in item.data or (
not "\nCOMPLETED:" in item.data
and not "\nSTATUS:COMPLETED" in item.data
and not "\nSTATUS:CANCELLED" in item.data
):
objects.append(item)
else:
if not xml:
(xml, comp_class) = self.build_search_xml_query(
comp_class=comp_class, todo=todo, props=props, **kwargs
)
elif kwargs:
raise error.ConsistencyError(
"Inconsistent usage parameters: xml together with other search options"
)
(response, objects) = self._request_report_build_resultlist(
xml, comp_class, props=props
)
if kwargs.get("expand", False):
## expand can only be used together with start and end.
## Error checking is done in build_search_xml_query. If
## search is fed with an XML query together with expand,
## then it's considered a "search option", and an error is
## raised above.
start = kwargs["start"]
end = kwargs["end"]
for o in objects:
## This would not be needed if the servers would follow the standard ...
o.load(only_if_unloaded=True)
## Google sometimes returns empty objects
objects = [o for o in objects if o.icalendar_component]
for o in objects:
component = o.icalendar_component
if component is None:
continue
recurrence_properties = ["exdate", "exrule", "rdate", "rrule"]
if any(key in component for key in recurrence_properties):
o.expand_rrule(start, end)
if split_expanded:
objects_ = objects
objects = []
for o in objects_:
objects.extend(o.split_expanded())
def sort_key_func(x):
ret = []
comp = x.icalendar_component
defaults = {
## TODO: all possible non-string sort attributes needs to be listed here, otherwise we will get type errors when comparing objects with the property defined vs undefined (or maybe we should make an "undefined" object that always will compare below any other type? Perhaps there exists such an object already?)
"due": "2050-01-01",
"dtstart": "1970-01-01",
"priority": 0,
"status": {
"VTODO": "NEEDS-ACTION",
"VJOURNAL": "FINAL",
"VEVENT": "TENTATIVE",
}[comp.name],
"category": "",
## Usage of strftime is a simple way to ensure there won't be
## problems if comparing dates with timestamps
"isnt_overdue": not (
"due" in comp
and comp["due"].dt.strftime("%F%H%M%S")
< datetime.now().strftime("%F%H%M%S")
),
"hasnt_started": (
"dtstart" in comp
and comp["dtstart"].dt.strftime("%F%H%M%S")
> datetime.now().strftime("%F%H%M%S")
),
}
for sort_key in sort_keys:
val = comp.get(sort_key, None)
if val is None:
ret.append(defaults.get(sort_key.lower(), ""))
continue
if hasattr(val, "dt"):
val = val.dt
elif hasattr(val, "cats"):
val = ",".join(val.cats)
if hasattr(val, "strftime"):
ret.append(val.strftime("%F%H%M%S"))
else:
ret.append(val)
return ret
if sort_keys:
objects.sort(key=sort_key_func)
## partial workaround for https://github.com/python-caldav/caldav/issues/201
for obj in objects:
try:
obj.load(only_if_unloaded=True)
except:
pass
return objects
def build_search_xml_query(
self,
comp_class=None,
todo=None,
ignore_completed1=None,
ignore_completed2=None,
ignore_completed3=None,
event=None,
filters=None,
expand=None,
start=None,
end=None,
props=None,
**kwargs,
):
"""This method will produce a caldav search query as an etree object.
It is primarily to be used from the search method. See the
documentation for the search method for more information.
"""
# those xml elements are weird. (a+b)+c != a+(b+c). First makes b and c as list members of a, second makes c an element in b which is an element of a.
# First objective is to let this take over all xml search query building and see that the current tests pass.
# ref https://www.ietf.org/rfc/rfc4791.txt, section 7.8.9 for how to build a todo-query
# We'll play with it and don't mind it's getting ugly and don't mind that the test coverage is lacking.
# we'll refactor and create some unit tests later, as well as ftests for complicated queries.
# build the request
data = cdav.CalendarData()
if expand:
if not start or not end:
raise error.ReportError("can't expand without a date range")
data += cdav.Expand(start, end)
if props is None:
props_ = [data]
else:
props_ = [data] + props
prop = dav.Prop() + props_
vcalendar = cdav.CompFilter("VCALENDAR")
comp_filter = None
if not filters:
filters = []
vNotCompleted = cdav.TextMatch("COMPLETED", negate=True)
vNotCancelled = cdav.TextMatch("CANCELLED", negate=True)
vNeedsAction = cdav.TextMatch("NEEDS-ACTION")
vStatusNotCompleted = cdav.PropFilter("STATUS") + vNotCompleted
vStatusNotCancelled = cdav.PropFilter("STATUS") + vNotCancelled
vStatusNeedsAction = cdav.PropFilter("STATUS") + vNeedsAction
vStatusNotDefined = cdav.PropFilter("STATUS") + cdav.NotDefined()
vNoCompleteDate = cdav.PropFilter("COMPLETED") + cdav.NotDefined()
if ignore_completed1:
## This query is quite much in line with https://tools.ietf.org/html/rfc4791#section-7.8.9
filters.extend([vNoCompleteDate, vStatusNotCompleted, vStatusNotCancelled])
elif ignore_completed2:
## some server implementations (i.e. NextCloud
## and Baikal) will yield "false" on a negated TextMatch
## if the field is not defined. Hence, for those
## implementations we need to turn back and ask again
## ... do you have any VTODOs for us where the STATUS
## field is not defined? (ref
## https://github.com/python-caldav/caldav/issues/14)
filters.extend([vNoCompleteDate, vStatusNotDefined])
elif ignore_completed3:
## ... and considering recurring tasks we really need to
## look a third time as well, this time for any task with
## the NEEDS-ACTION status set (do we need the first go?
## NEEDS-ACTION or no status set should cover them all?)
filters.extend([vStatusNeedsAction])
if start or end:
filters.append(cdav.TimeRange(start, end))
if todo is not None:
if not todo:
raise NotImplementedError()
if todo:
if comp_class is not None and comp_class is not Todo:
raise error.ConsistencyError(
"inconsistent search parameters - comp_class = %s, todo=%s"
% (comp_class, todo)
)
comp_filter = cdav.CompFilter("VTODO")
comp_class = Todo
if event is not None:
if not event:
raise NotImplementedError()
if event:
if comp_class is not None and comp_class is not Event:
raise error.ConsistencyError(
"inconsistent search parameters - comp_class = %s, event=%s"
% (comp_class, event)
)
comp_filter = cdav.CompFilter("VEVENT")
comp_class = Event
elif comp_class:
if comp_class is Todo:
comp_filter = cdav.CompFilter("VTODO")
elif comp_class is Event:
comp_filter = cdav.CompFilter("VEVENT")
elif comp_class is Journal:
comp_filter = cdav.CompFilter("VJOURNAL")
else:
raise error.ConsistencyError(
"unsupported comp class %s for search" % comp_class
)
for other in kwargs:
find_not_defined = other.startswith("no_")
find_defined = other.startswith("has_")
if find_not_defined:
other = other[3:]
if find_defined:
other = other[4:]
if other in (
"uid",
"summary",
"comment",
"class_",
"class",
"category",
"description",
"location",
"status",
"due",
"dtstamp",
"dtstart",
"dtend",
"duration",
"priority",
):
## category and class_ is special
if other.endswith("category"):
## TODO: we probably need to do client side filtering. I would
## expect --category='e' to fetch anything having the category e,
## but not including all other categories containing the letter e.
## As I read the caldav standard, the latter will be yielded.
target = other.replace("category", "categories")
elif other == "class_":
target = "class"
else:
target = other
if find_not_defined:
match = cdav.NotDefined()
elif find_defined:
raise NotImplemented(
"Seems not to be supported by the CalDAV protocol? or we can negate? not supported yet, in any case"
)
else:
match = cdav.TextMatch(kwargs[other])
filters.append(cdav.PropFilter(target.upper()) + match)
else:
raise NotImplementedError("searching for %s not supported yet" % other)
if comp_filter and filters:
comp_filter += filters
vcalendar += comp_filter
elif comp_filter:
vcalendar += comp_filter
elif filters:
vcalendar += filters
filter = cdav.Filter() + vcalendar
root = cdav.CalendarQuery() + [prop, filter]
return (root, comp_class)
def freebusy_request(self, start, end):
"""
Search the calendar, but return only the free/busy information.
Parameters:
* start = datetime.today().
* end = same as above.
Returns:
* [FreeBusy(), ...]
"""
root = cdav.FreeBusyQuery() + [cdav.TimeRange(start, end)]
response = self._query(root, 1, "report")
return FreeBusy(self, response.raw)
def todos(
self, sort_keys=("due", "priority"), include_completed=False, sort_key=None
):
"""
fetches a list of todo events (refactored to a wrapper around search)
Parameters:
* sort_keys: use this field in the VTODO for sorting (iterable of
lower case string, i.e. ('priority','due')).
* include_completed: boolean -
by default, only pending tasks are listed
* sort_key: DEPRECATED, for backwards compatibility with version 0.4.
"""
if sort_key:
sort_keys = (sort_key,)
return self.search(
todo=True, include_completed=include_completed, sort_keys=sort_keys
)
def _calendar_comp_class_by_data(self, data):
"""
takes some data, either as icalendar text or icalender object (TODO:
consider vobject) and returns the appropriate
CalendarResourceObject child class.
"""
if data is None:
## no data received - we'd need to load it before we can know what
## class it really is. Assign the base class as for now.
return CalendarObjectResource
if hasattr(data, "split"):
for line in data.split("\n"):
line = line.strip()
if line == "BEGIN:VEVENT":
return Event
if line == "BEGIN:VTODO":
return Todo
if line == "BEGIN:VJOURNAL":
return Journal
if line == "BEGIN:VFREEBUSY":
return FreeBusy
elif hasattr(data, "subcomponents"):
if not len(data.subcomponents):
return CalendarObjectResource
ical2caldav = {
icalendar.Event: Event,
icalendar.Todo: Todo,
icalendar.Journal: Journal,
icalendar.FreeBusy: FreeBusy,
}
for sc in data.subcomponents:
if sc.__class__ in ical2caldav:
return ical2caldav[sc.__class__]
return CalendarObjectResource
def event_by_url(self, href, data=None):
"""
Returns the event with the given URL
"""
return Event(url=href, data=data, parent=self).load()
def object_by_uid(self, uid, comp_filter=None, comp_class=None):
"""
Get one event from the calendar.
Parameters:
* uid: the event uid
* comp_class: filter by component type (Event, Todo, Journal)
* comp_filter: for backward compatibility
Returns:
* Event() or None
"""
if comp_filter:
assert not comp_class
if hasattr(comp_filter, "attributes"):
comp_filter = comp_filter.attributes["name"]
if comp_filter == "VTODO":
comp_class = Todo
elif comp_filter == "VJOURNAL":
comp_class = Journal
elif comp_filter == "VEVENT":
comp_class = Event
else:
raise error.ConsistencyError("Wrong compfilter")
query = cdav.TextMatch(uid)
query = cdav.PropFilter("UID") + query
root, comp_class = self.build_search_xml_query(
comp_class=comp_class, filters=[query]
)
try:
items_found = self.search(root)
if not items_found:
raise error.NotFoundError("%s not found on server" % uid)
except Exception as err:
if comp_filter is not None:
raise
logging.warning(
"Error %s from server when doing an object_by_uid(%s). search without compfilter set is not compatible with all server implementations, trying event_by_uid + todo_by_uid + journal_by_uid instead"
% (str(err), uid)
)
items_found = []
for compfilter in ("VTODO", "VEVENT", "VJOURNAL"):
try:
items_found.append(
self.object_by_uid(uid, cdav.CompFilter(compfilter))
)
except error.NotFoundError:
pass
if len(items_found) >= 1:
if len(items_found) > 1:
logging.error(
"multiple items found with same UID. Returning the first one"
)
return items_found[0]
# Ref Lucas Verney, we've actually done a substring search, if the
# uid given in the query is short (i.e. just "0") we're likely to
# get false positives back from the server, we need to do an extra
# check that the uid is correct
items_found2 = []
for item in items_found:
## In v0.10.0 we used regexps here - it's probably more optimized,
## but at one point it broke due to an extra CR in the data.
## Usage of the icalendar library increases readability and
## reliability
if item.icalendar_component:
item_uid = item.icalendar_component.get("UID", None)
if item_uid and item_uid == uid:
items_found2.append(item)
if not items_found2:
raise error.NotFoundError("%s not found on server" % uid)
error.assert_(len(items_found2) == 1)
return items_found2[0]
def todo_by_uid(self, uid):
return self.object_by_uid(uid, comp_filter=cdav.CompFilter("VTODO"))
def event_by_uid(self, uid):
return self.object_by_uid(uid, comp_filter=cdav.CompFilter("VEVENT"))
def journal_by_uid(self, uid):
return self.object_by_uid(uid, comp_filter=cdav.CompFilter("VJOURNAL"))
# alias for backward compatibility
event = event_by_uid
def events(self):
"""
List all events from the calendar.
Returns:
* [Event(), ...]
"""
return self.search(comp_class=Event)
def objects_by_sync_token(self, sync_token=None, load_objects=False):
"""objects_by_sync_token aka objects
Do a sync-collection report, ref RFC 6578 and
https://github.com/python-caldav/caldav/issues/87
This method will return all objects in the calendar if no
sync_token is passed (the method should then be referred to as
"objects"), or if the sync_token is unknown to the server. If
a sync-token known by the server is passed, it will return
objects that are added, deleted or modified since last time
the sync-token was set.
If load_objects is set to True, the objects will be loaded -
otherwise empty CalendarObjectResource objects will be returned.
This method will return a SynchronizableCalendarObjectCollection object, which is
an iterable.
"""
cmd = dav.SyncCollection()
token = dav.SyncToken(value=sync_token)
level = dav.SyncLevel(value="1")
props = dav.Prop() + dav.GetEtag()
root = cmd + [level, token, props]
(response, objects) = self._request_report_build_resultlist(
root, props=[dav.GetEtag()], no_calendardata=True
)
## TODO: look more into this, I think sync_token should be directly available through response object
try:
sync_token = response.sync_token
except:
sync_token = response.tree.findall(".//" + dav.SyncToken.tag)[0].text
## this is not quite right - the etag we've fetched can already be outdated
if load_objects:
for obj in objects:
try:
obj.load()
except error.NotFoundError:
## The object was deleted
pass
return SynchronizableCalendarObjectCollection(
calendar=self, objects=objects, sync_token=sync_token
)
objects = objects_by_sync_token
def journals(self):
"""
List all journals from the calendar.
Returns:
* [Journal(), ...]
"""
return self.search(comp_class=Journal)
class ScheduleMailbox(Calendar):
"""
RFC6638 defines an inbox and an outbox for handling event scheduling.
TODO: As ScheduleMailboxes works a bit like calendars, I've chosen
to inheritate the Calendar class, but this is a bit incorrect, a
ScheduleMailbox is a collection, but not really a calendar. We
should create a common base class for ScheduleMailbox and Calendar
eventually.
"""
def __init__(self, client=None, principal=None, url=None):
"""
Will locate the mbox if no url is given
"""
super(ScheduleMailbox, self).__init__(client=client, url=url)
self._items = None
if not client and principal:
self.client = principal.client
if not principal and client:
principal = self.client.principal
if url is not None:
self.url = client.url.join(URL.objectify(url))
else:
self.url = principal.url
try:
self.url = self.client.url.join(URL(self.get_property(self.findprop())))
except:
logging.error("something bad happened", exc_info=True)
error.assert_(self.client.check_scheduling_support())
self.url = None
raise error.NotFoundError(
"principal has no %s. %s"
% (str(self.findprop()), error.ERR_FRAGMENT)
)
def get_items(self):
"""
TODO: work in progress
TODO: perhaps this belongs to the super class?
"""
if not self._items:
try:
self._items = self.objects(load_objects=True)
except:
logging.debug(
"caldav server does not seem to support a sync-token REPORT query on a scheduling mailbox"
)
error.assert_("google" in str(self.url))
self._items = [
CalendarObjectResource(url=x[0], client=self.client)
for x in self.children()
]
for x in self._items:
x.load()
else:
try:
self._items.sync()
except:
self._items = [
CalendarObjectResource(url=x[0], client=self.client)
for x in self.children()
]
for x in self._items:
x.load()
return self._items
## TODO: work in progress
# def get_invites():
# for item in self.get_items():
# if item.vobject_instance.vevent.
class ScheduleInbox(ScheduleMailbox):
findprop = cdav.ScheduleInboxURL
class ScheduleOutbox(ScheduleMailbox):
findprop = cdav.ScheduleOutboxURL
class SynchronizableCalendarObjectCollection(object):
"""
This class may hold a cached snapshot of a calendar, and changes
in the calendar can easily be copied over through the sync method.
To create a SynchronizableCalendarObjectCollection object, use
calendar.objects(load_objects=True)
"""
def __init__(self, calendar, objects, sync_token):
self.calendar = calendar
self.sync_token = sync_token
self.objects = objects
self._objects_by_url = None
def __iter__(self):
return self.objects.__iter__()
def __len__(self):
return len(self.objects)
def objects_by_url(self):
"""
returns a dict of the contents of the SynchronizableCalendarObjectCollection, URLs -> objects.
"""
if self._objects_by_url is None:
self._objects_by_url = {}
for obj in self:
self._objects_by_url[obj.url.canonical()] = obj
return self._objects_by_url
def sync(self):
"""
This method will contact the caldav server,
request all changes from it, and sync up the collection
"""
updated_objs = []
deleted_objs = []
updates = self.calendar.objects_by_sync_token(
self.sync_token, load_objects=False
)
obu = self.objects_by_url()
for obj in updates:
obj.url = obj.url.canonical()
if (
obj.url in obu
and dav.GetEtag.tag in obu[obj.url].props
and dav.GetEtag.tag in obj.props
):
if obu[obj.url].props[dav.GetEtag.tag] == obj.props[dav.GetEtag.tag]:
continue
obu[obj.url] = obj
try:
obj.load()
updated_objs.append(obj)
except error.NotFoundError:
deleted_objs.append(obj)
obu.pop(obj.url)
self.objects = obu.values()
self.sync_token = updates.sync_token
return (updated_objs, deleted_objs)
class CalendarObjectResource(DAVObject):
"""
Ref RFC 4791, section 4.1, a "Calendar Object Resource" can be an
event, a todo-item, a journal entry, or a free/busy entry
"""
RELTYPE_REVERSER: ClassVar = {
"PARENT": "CHILD",
"CHILD": "PARENT",
"SIBLING": "SIBLING",
}
_ENDPARAM = None
_vobject_instance = None
_icalendar_instance = None
_data = None
def __init__(
self, client=None, url=None, data=None, parent=None, id=None, props=None
):
"""
CalendarObjectResource has an additional parameter for its constructor:
* data = "...", vCal data for the event
"""
super(CalendarObjectResource, self).__init__(
client=client, url=url, parent=parent, id=id, props=props
)
if data is not None:
self.data = data
if id:
old_id = self.icalendar_component.pop("UID", None)
self.icalendar_component.add("UID", id)
def add_organizer(self):
"""
goes via self.client, finds the principal, figures out the right attendee-format and adds an
organizer line to the event
"""
principal = self.client.principal()
## TODO: remove Organizer-field, if exists
## TODO: what if walk returns more than one vevent?
self.icalendar_component.add("organizer", principal.get_vcal_address())
def split_expanded(self):
i = self.icalendar_instance.subcomponents
tz_ = [x for x in i if isinstance(x, icalendar.Timezone)]
ntz = [x for x in i if not isinstance(x, icalendar.Timezone)]
if len(ntz) == 1:
return [self]
if tz_:
error.assert_(len(tz_) == 1)
ret = []
for ical_obj in ntz:
obj = self.copy(keep_uid=True)
obj.icalendar_instance.subcomponents = []
if tz_:
obj.icalendar_instance.subcomponents.append(tz_[0])
obj.icalendar_instance.subcomponents.append(ical_obj)
ret.append(obj)
return ret
def expand_rrule(self, start, end):
"""This method will transform the calendar content of the
event and expand the calendar data from a "master copy" with
RRULE set and into a "recurrence set" with RECURRENCE-ID set
and no RRULE set. The main usage is for client-side expansion
in case the calendar server does not support server-side
expansion. It should be safe to save back to the server, the
server should recognize it as recurrences and should not edit
the "master copy". If doing a `self.load`, the calendar
content will be replaced with the "master copy". However, as
of 2022-10 there is no test code verifying this.
:param event: Event
:param start: datetime.datetime
:param end: datetime.datetime
"""
import recurring_ical_events
recurrings = recurring_ical_events.of(
self.icalendar_instance, components=["VJOURNAL", "VTODO", "VEVENT"]
).between(start, end)
recurrence_properties = ["exdate", "exrule", "rdate", "rrule"]
# FIXME too much copying
stripped_event = self.copy(keep_uid=True)
# remove all recurrence properties
for component in stripped_event.vobject_instance.components():
if component.name in ("VEVENT", "VTODO"):
for key in recurrence_properties:
try:
del component.contents[key]
except KeyError:
pass
calendar = self.icalendar_instance
calendar.subcomponents = []
for occurrence in recurrings:
occurrence.add("RECURRENCE-ID", occurrence.get("DTSTART"))
calendar.add_component(occurrence)
# add other components (except for the VEVENT itself and VTIMEZONE which is not allowed on occurrence events)
for component in stripped_event.icalendar_instance.subcomponents:
if component.name not in ("VEVENT", "VTODO", "VTIMEZONE"):
calendar.add_component(component)
def set_relation(
self, other, reltype=None, set_reverse=True
): ## TODO: logic to find and set siblings?
"""
Sets a relation between this object and another object (given by uid or object).
"""
##TODO: test coverage
reltype = reltype.upper()
if isinstance(other, CalendarObjectResource):
if other.id:
uid = other.id
else:
uid = other.icalendar_component["uid"]
else:
uid = other
if set_reverse:
other = self.parent.object_by_uid(uid)
if set_reverse:
reltype_reverse = self.RELTYPE_REVERSER[reltype]
other.set_relation(other=self, reltype=reltype_reverse, set_reverse=False)
existing_relation = self.icalendar_component.get("related-to", None)
existing_relations = (
existing_relation
if isinstance(existing_relation, list)
else [existing_relation]
)
for rel in existing_relations:
if rel == uid:
return
# without str(…), icalendar ignores properties
# because if type(uid) == vText
# then Component._encode does miss adding properties
# see https://github.com/collective/icalendar/issues/557
# workaround should be safe to remove if issue gets fixed
uid = str(uid)
self.icalendar_component.add(
"related-to", uid, parameters={"RELTYPE": reltype}, encode=True
)
self.save()
## TODO: this method is undertested in the caldav library.
## However, as this consolidated and eliminated quite some duplicated code in the
## plann project, it is extensively tested in plann.
def get_relatives(
self, reltypes=None, relfilter=None, fetch_objects=True, ignore_missing=True
):
"""
By default, loads all objects pointed to by the RELATED-TO
property and loads the related objects.
It's possible to filter, either by passing a set or a list of
acceptable relation types in reltypes, or by passing a lambda
function in relfilter.
TODO: Make it possible to also check up reverse relationships
TODO: this is partially overlapped by plann.lib._relships_by_type
in the plann tool. Should consolidate the code.
"""
ret = defaultdict(set)
relations = self.icalendar_component.get("RELATED-TO", [])
if not isinstance(relations, list):
relations = [relations]
for rel in relations:
if relfilter and not relfilter(rel):
continue
reltype = rel.params.get("RELTYPE", "PARENT")
if reltypes and not reltype in reltypes:
continue
ret[reltype].add(str(rel))
if fetch_objects:
for reltype in ret:
uids = ret[reltype]
ret[reltype] = []
for obj in uids:
try:
ret[reltype].append(self.parent.object_by_uid(obj))
except error.NotFoundError:
if not ignore_missing:
raise
return ret
def _get_icalendar_component(self, assert_one=False):
"""Returns the icalendar subcomponent - which should be an
Event, Journal, Todo or FreeBusy from the icalendar class
See also https://github.com/python-caldav/caldav/issues/232
"""
self.load(only_if_unloaded=True)
ret = [
x
for x in self.icalendar_instance.subcomponents
if not isinstance(x, icalendar.Timezone)
]
error.assert_(len(ret) == 1 or not assert_one)
for x in ret:
for cl in (
icalendar.Event,
icalendar.Journal,
icalendar.Todo,
icalendar.FreeBusy,
):
if isinstance(x, cl):
return x
error.assert_(False)
def _set_icalendar_component(self, value):
s = self.icalendar_instance.subcomponents
i = [i for i in range(0, len(s)) if not isinstance(s[i], icalendar.Timezone)]
if len(i) == 1:
self.icalendar_instance.subcomponents[i[0]] = value
else:
my_instance = icalendar.Calendar()
my_instance.add("prodid", "-//python-caldav//caldav//" + language)
my_instance.add("version", "2.0")
my_instance.add_component(value)
self.icalendar_instance = my_instance
icalendar_component = property(
_get_icalendar_component,
_set_icalendar_component,
doc="icalendar component - should not be used with recurrence sets",
)
def get_due(self):
"""
A VTODO may have due or duration set. Return or calculate due.
WARNING: this method is likely to be deprecated and moved to
the icalendar library. If you decide to use it, please put
caldav<2.0 in the requirements.
"""
i = self.icalendar_component
if "DUE" in i:
return i["DUE"].dt
elif "DTEND" in i:
return i["DTEND"].dt
elif "DURATION" in i and "DTSTART" in i:
return i["DTSTART"].dt + i["DURATION"].dt
else:
return None
get_dtend = get_due
def add_attendee(self, attendee, no_default_parameters=False, **parameters):
"""
For the current (event/todo/journal), add an attendee.
The attendee can be any of the following:
* A principal
* An email address prepended with "mailto:"
* An email address without the "mailto:"-prefix
* A two-item tuple containing a common name and an email address
* (not supported, but planned: an ical text line starting with the word "ATTENDEE")
Any number of attendee parameters can be given, those will be used
as defaults unless no_default_parameters is set to True:
partstat=NEEDS-ACTION
cutype=UNKNOWN (unless a principal object is given)
rsvp=TRUE
role=REQ-PARTICIPANT
schedule-agent is not set
"""
from icalendar import vCalAddress, vText
if isinstance(attendee, Principal):
attendee_obj = attendee.get_vcal_address()
elif isinstance(attendee, vCalAddress):
attendee_obj = attendee
elif isinstance(attendee, tuple):
if attendee[1].startswith("mailto:"):
attendee_obj = vCalAddress(attendee[1])
else:
attendee_obj = vCalAddress("mailto:" + attendee[1])
attendee_obj.params["cn"] = vText(attendee[0])
elif isinstance(attendee, str):
if attendee.startswith("ATTENDEE"):
raise NotImplementedError(
"do we need to support this anyway? Should be trivial, but can't figure out how to do it with the icalendar.Event/vCalAddress objects right now"
)
elif attendee.startswith("mailto:"):
attendee_obj = vCalAddress(attendee)
elif "@" in attendee and not ":" in attendee and not ";" in attendee:
attendee_obj = vCalAddress("mailto:" + attendee)
else:
error.assert_(False)
attendee_obj = vCalAddress()
## TODO: if possible, check that the attendee exists
## TODO: check that the attendee will not be duplicated in the event.
if not no_default_parameters:
## Sensible defaults:
attendee_obj.params["partstat"] = "NEEDS-ACTION"
if not "cutype" in attendee_obj.params:
attendee_obj.params["cutype"] = "UNKNOWN"
attendee_obj.params["rsvp"] = "TRUE"
attendee_obj.params["role"] = "REQ-PARTICIPANT"
params = {}
for key in parameters:
new_key = key.replace("_", "-")
if parameters[key] == True:
params[new_key] = "TRUE"
else:
params[new_key] = parameters[key]
attendee_obj.params.update(params)
ievent = self.icalendar_component
ievent.add("attendee", attendee_obj)
def is_invite_request(self):
self.load(only_if_unloaded=True)
return self.icalendar_instance.get("method", None) == "REQUEST"
def accept_invite(self, calendar=None):
self._reply_to_invite_request("ACCEPTED", calendar)
def decline_invite(self, calendar=None):
self._reply_to_invite_request("DECLINED", calendar)
def tentatively_accept_invite(self, calendar=None):
self._reply_to_invite_request("TENTATIVE", calendar)
## TODO: DELEGATED is also a valid option, and for vtodos the
## partstat can also be set to COMPLETED and IN-PROGRESS.
def _reply_to_invite_request(self, partstat, calendar):
error.assert_(self.is_invite_request())
if not calendar:
calendar = self.client.principal().calendars()[0]
## we need to modify the icalendar code, update our own participant status
self.icalendar_instance.pop("METHOD")
self.change_attendee_status(partstat=partstat)
self.get_property(cdav.ScheduleTag(), use_cached=True)
try:
calendar.save_event(self.data)
except Exception as some_exception:
## TODO - TODO - TODO
## RFC6638 does not seem to be very clear (or
## perhaps I should read it more thoroughly) neither on
## how to handle conflicts, nor if the reply should be
## posted to the "outbox", saved back to the same url or
## sent to a calendar.
self.load()
self.get_property(cdav.ScheduleTag(), use_cached=False)
outbox = self.client.principal().schedule_outbox()
if calendar != outbox:
self._reply_to_invite_request(partstat, calendar=outbox)
else:
self.save()
def copy(self, keep_uid=False, new_parent=None):
"""
Events, todos etc can be copied within the same calendar, to another
calendar or even to another caldav server
"""
obj = self.__class__(
parent=new_parent or self.parent,
data=self.data,
id=self.id if keep_uid else str(uuid.uuid1()),
)
if new_parent or not keep_uid:
obj.url = obj.generate_url()
else:
obj.url = self.url
return obj
def load(self, only_if_unloaded=False):
"""
(Re)load the object from the caldav server.
"""
if only_if_unloaded and self.is_loaded():
return
r = self.client.request(self.url)
if r.status == 404:
raise error.NotFoundError(errmsg(r))
self.data = vcal.fix(r.raw)
if "Etag" in r.headers:
self.props[dav.GetEtag.tag] = r.headers["Etag"]
if "Schedule-Tag" in r.headers:
self.props[cdav.ScheduleTag.tag] = r.headers["Schedule-Tag"]
return self
## TODO: self.id should either always be available or never
def _find_id_path(self, id=None, path=None):
"""
With CalDAV, every object has a URL. With icalendar, every object
should have a UID. This UID may or may not be copied into self.id.
This method will:
0) if ID is given, assume that as the UID, and set it in the object
1) if UID is given in the object, assume that as the ID
2) if ID is not given, but the path is given, generate the ID from the
path
3) If neither ID nor path is given, use the uuid method to generate an
ID (TODO: recommendation is to concat some timestamp, serial or
random number and a domain)
4) if no path is given, generate the URL from the ID
"""
i = self._get_icalendar_component(assert_one=False)
if not id and getattr(self, "id", None):
id = self.id
if not id:
id = i.pop("UID", None)
if id:
id = str(id)
if not path and getattr(self, "path", None):
path = self.path
if id is None and path is not None and str(path).endswith(".ics"):
id = re.search("(/|^)([^/]*).ics", str(path)).group(2)
if id is None:
id = str(uuid.uuid1())
i.pop("UID", None)
i.add("UID", id)
self.id = id
for x in self.icalendar_instance.subcomponents:
if not isinstance(x, icalendar.Timezone):
error.assert_(x.get("UID", None) == self.id)
if path is None:
path = self.generate_url()
else:
path = self.parent.url.join(path)
self.url = URL.objectify(path)
def _put(self, retry_on_failure=True):
## SECURITY TODO: we should probably have a check here to verify that no such object exists already
r = self.client.put(
self.url, self.data, {"Content-Type": 'text/calendar; charset="utf-8"'}
)
if r.status == 302:
path = [x[1] for x in r.headers if x[0] == "location"][0]
elif not (r.status in (204, 201)):
if retry_on_failure:
## This looks like a noop, but the object may be "cleaned".
## See https://github.com/python-caldav/caldav/issues/43
self.vobject_instance
return self._put(False)
else:
raise error.PutError(errmsg(r))
def _create(self, id=None, path=None, retry_on_failure=True):
## We're efficiently running the icalendar code through the icalendar
## library. This may cause data modifications and may "unfix"
## https://github.com/python-caldav/caldav/issues/43
self._find_id_path(id=id, path=path)
self._put()
def generate_url(self):
## See https://github.com/python-caldav/caldav/issues/143 for the rationale behind double-quoting slashes
## TODO: should try to wrap my head around issues that arises when id contains weird characters. maybe it's
## better to generate a new uuid here, particularly if id is in some unexpected format.
if not self.id:
self.id = self._get_icalendar_component(assert_one=False)["UID"]
return self.parent.url.join(quote(self.id.replace("/", "%2F")) + ".ics")
def change_attendee_status(self, attendee=None, **kwargs):
if not attendee:
attendee = self.client.principal()
cnt = 0
if isinstance(attendee, Principal):
for addr in attendee.calendar_user_address_set():
try:
self.change_attendee_status(addr, **kwargs)
## TODO: can probably just return now
cnt += 1
except error.NotFoundError:
pass
if not cnt:
raise error.NotFoundError(
"Principal %s is not invited to event" % str(attendee)
)
error.assert_(cnt == 1)
return
ical_obj = self.icalendar_component
attendee_lines = ical_obj["attendee"]
if isinstance(attendee_lines, str):
attendee_lines = [attendee_lines]
strip_mailto = lambda x: str(x).replace("mailto:", "").lower()
for attendee_line in attendee_lines:
if strip_mailto(attendee_line) == strip_mailto(attendee):
attendee_line.params.update(kwargs)
cnt += 1
if not cnt:
raise error.NotFoundError("Participant %s not found in attendee list")
error.assert_(cnt == 1)
def save(
self,
no_overwrite=False,
no_create=False,
obj_type=None,
increase_seqno=True,
if_schedule_tag_match=False,
):
"""
Save the object, can be used for creation and update.
no_overwrite and no_create will check if the object exists.
Those two are mutually exclusive. Some servers don't support
searching for an object uid without explicitly specifying what
kind of object it should be, hence obj_type can be passed.
obj_type is only used in conjunction with no_overwrite and
no_create.
Returns:
* self
"""
if (
self._vobject_instance is None
and self._data is None
and self._icalendar_instance is None
):
return self
path = self.url.path if self.url else None
if no_overwrite or no_create:
## SECURITY TODO: path names on the server does not
## necessarily map cleanly to UUIDs. We need to do quite
## some refactoring here to ensure all corner cases are
## covered. Doing a GET first to check if the resource is
## found and then a PUT also gives a potential race
## condition. (Possibly the API gives no safe way to ensure
## a unique new calendar item is created to the server without
## overwriting old stuff or vice versa - it seems silly to me
## to do a PUT instead of POST when creating new data).
## TODO: the "find id"-logic is duplicated in _create,
## should be refactored
if not self.id:
for component in self.vobject_instance.getChildren():
if hasattr(component, "uid"):
self.id = component.uid.value
if not self.id and no_create:
raise error.ConsistencyError("no_create flag was set, but no ID given")
existing = None
## some servers require one to explicitly search for the right kind of object.
## todo: would arguably be nicer to verify the type of the object and take it from there
if not self.id:
methods = []
elif obj_type:
methods = (getattr(self.parent, "%s_by_uid" % obj_type),)
else:
methods = (
self.parent.object_by_uid,
self.parent.event_by_uid,
self.parent.todo_by_uid,
self.parent.journal_by_uid,
)
for method in methods:
try:
existing = method(self.id)
if no_overwrite:
raise error.ConsistencyError(
"no_overwrite flag was set, but object already exists"
)
break
except error.NotFoundError:
pass
if no_create and not existing:
raise error.ConsistencyError(
"no_create flag was set, but object does not exists"
)
if increase_seqno and b"SEQUENCE" in to_wire(self.data):
seqno = self.icalendar_component.pop("SEQUENCE", None)
if seqno is not None:
self.icalendar_component.add("SEQUENCE", seqno + 1)
self._create(id=self.id, path=path)
return self
def is_loaded(self):
return (
self._data or self._vobject_instance or self._icalendar_instance
) and self.data.count("BEGIN:") > 1
def __str__(self):
return "%s: %s" % (self.__class__.__name__, self.url)
## implementation of the properties self.data,
## self.vobject_instance and self.icalendar_instance follows. The
## rule is that only one of them can be set at any time, this
## since vobject_instance and icalendar_instance are mutable,
## and any modification to those instances should apply
def _set_data(self, data):
## The __init__ takes a data attribute, and it should be allowable to
## set it to a vobject object or an icalendar object, hence we should
## do type checking on the data (TODO: but should probably use
## isinstance rather than this kind of logic
if type(data).__module__.startswith("vobject"):
self._set_vobject_instance(data)
return self
if type(data).__module__.startswith("icalendar"):
self._set_icalendar_instance(data)
return self
self._data = vcal.fix(data)
self._vobject_instance = None
self._icalendar_instance = None
return self
def _get_data(self):
if self._data:
return to_normal_str(self._data)
elif self._vobject_instance:
return to_normal_str(self._vobject_instance.serialize())
elif self._icalendar_instance:
return to_normal_str(self._icalendar_instance.to_ical())
return None
def _get_wire_data(self):
if self._data:
return to_wire(self._data)
elif self._vobject_instance:
return to_wire(self._vobject_instance.serialize())
elif self._icalendar_instance:
return to_wire(self._icalendar_instance.to_ical())
return None
data = property(
_get_data, _set_data, doc="vCal representation of the object as normal string"
)
wire_data = property(
_get_wire_data,
_set_data,
doc="vCal representation of the object in wire format (UTF-8, CRLN)",
)
def _set_vobject_instance(self, inst):
self._vobject_instance = inst
self._data = None
self._icalendar_instance = None
return self
def _get_vobject_instance(self):
if not self._vobject_instance:
if self._get_data() is None:
return None
try:
self._set_vobject_instance(
vobject.readOne(to_unicode(self._get_data()))
)
except:
log.critical(
"Something went wrong while loading icalendar data into the vobject class. ical url: "
+ str(self.url)
)
raise
return self._vobject_instance
vobject_instance = property(
_get_vobject_instance,
_set_vobject_instance,
doc="vobject instance of the object",
)
def _set_icalendar_instance(self, inst):
self._icalendar_instance = inst
self._data = None
self._vobject_instance = None
return self
def _get_icalendar_instance(self):
if not self._icalendar_instance:
if not self.data:
return None
self.icalendar_instance = icalendar.Calendar.from_ical(
to_unicode(self.data)
)
return self._icalendar_instance
icalendar_instance = property(
_get_icalendar_instance,
_set_icalendar_instance,
doc="icalendar instance of the object",
)
def get_duration(self):
"""According to the RFC, either DURATION or DUE should be set
for a task, but never both - implicitly meaning that DURATION
is the difference between DTSTART and DUE (personally I
believe that's stupid. If a task takes five minutes to
complete - say, fill in some simple form that should be
delivered before midnight at new years eve, then it feels
natural for me to define "duration" as five minutes, DTSTART
to "some days before new years eve" and DUE to 20xx-01-01
00:00:00 - but I digress.
This method will return DURATION if set, otherwise the
difference between DUE and DTSTART (if both of them are set).
TODO: should be fixed for Event class as well (only difference
is that DTEND is used rather than DUE) and possibly also for
Journal (defaults to one day, probably?)
WARNING: this method is likely to be deprecated and moved to
the icalendar library. If you decide to use it, please put
caldav<2.0 in the requirements.
"""
i = self.icalendar_component
return self._get_duration(i)
def _get_duration(self, i):
if "DURATION" in i:
return i["DURATION"].dt
elif "DTSTART" in i and self._ENDPARAM in i:
return i[self._ENDPARAM].dt - i["DTSTART"].dt
elif "DTSTART" in i and not isinstance(i["DTSTART"], datetime):
return timedelta(days=1)
else:
return timedelta(0)
## for backward-compatibility - may be changed to
## icalendar_instance in version 1.0
instance = vobject_instance
class Event(CalendarObjectResource):
"""
The `Event` object is used to represent an event (VEVENT).
As of 2020-12 it adds nothing to the inheritated class. (I have
frequently asked myself if we need those subclasses ... perhaps
not)
"""
_ENDPARAM = "DTEND"
pass
class Journal(CalendarObjectResource):
"""
The `Journal` object is used to represent a journal entry (VJOURNAL).
As of 2020-12 it adds nothing to the inheritated class. (I have
frequently asked myself if we need those subclasses ... perhaps
not)
"""
pass
class FreeBusy(CalendarObjectResource):
"""
The `FreeBusy` object is used to represent a freebusy response from
the server. __init__ is overridden, as a FreeBusy response has no
URL or ID. The inheritated methods .save and .load is moot and
will probably throw errors (perhaps the class hierarchy should be
rethought, to prevent the FreeBusy from inheritating moot methods)
Update: With RFC6638 a freebusy object can have a URL and an ID.
"""
def __init__(self, parent, data, url=None, id=None):
CalendarObjectResource.__init__(
self, client=parent.client, url=url, data=data, parent=parent, id=id
)
class Todo(CalendarObjectResource):
"""The `Todo` object is used to represent a todo item (VTODO). A
Todo-object can be completed. Extra logic for different ways to
complete one recurrence of a recurrent todo. Extra logic to
handle due vs duration.
"""
_ENDPARAM = "DUE"
def _next(self, ts=None, i=None, dtstart=None, rrule=None, by=None, no_count=True):
"""Special logic to fint the next DTSTART of a recurring
just-completed task.
If any BY*-parameters are present, assume the task should have
fixed deadlines and preserve information from the previous
dtstart. If no BY*-parameters are present, assume the
frequency is meant to be the interval between the tasks.
Examples:
1) Garbage collection happens every week on a Tuesday, but
never earlier than 09 in the morning. Hence, it may be
important to take out the thrash Monday evenings or Tuesday
morning. DTSTART of the original task is set to Tuesday
2022-11-01T08:50, DUE to 09:00.
1A) Task is completed 07:50 on the 1st of November. Next
DTSTART should be Tuesday the 7th of November at 08:50.
1B) Task is completed 09:15 on the 1st of November (which is
probably OK, since they usually don't come before 09:30).
Next DTSTART should be Tuesday the 7th of November at 08:50.
1C) Task is completed at the 5th of November. We've lost the
DUE, but the calendar has no idea weather the DUE was a very
hard due or not - and anyway, probably we'd like to do it
again on Tuesday, so next DTSTART should be Tuesday the 7th of
November at 08:50.
1D) Task is completed at the 7th of November at 07:50. Next
DTSTART should be one hour later. Now, this is very silly,
but an algorithm cannot do guesswork on weather it's silly or
not. If DTSTART would be set to the earliest possible time
one could start thinking on this task (like, Monday evening),
then we would get Tue the 14th of November, which does make
sense. Unfortunately the icalendar standard does not specify
what should be used for DTSTART and DURATION/DUE.
1E) Task is completed on the 7th of November at 08:55. This
efficiently means we've lost the 1st of November recurrence
but have done the 7th of November recurrence instead, so next
timestamp will be the 14th of November.
2) Floors at home should be cleaned like once a week, but
there is no fixed deadline for it. For some people it may
make sense to have a routine doing it i.e. every Tuesday, but
this is not a strict requirement. If it wasn't done one
Tuesday, it's probably even more important to do it Wednesday.
If the floor was cleaned on a Saturday, it probably doesn't
make sense cleaning it again on Tuesday, but it probably
shouldn't wait until next Tuesday. Rrule is set to
FREQ=WEEKLY, but without any BYDAY. The original VTODO is set
up with DTSTART 16:00 on Tuesday the 1st of November and DUE
17:00. After 17:00 there will be dinner, so best to get it
done before that.
2A) Floor cleaning was finished 14:30. The next recurrence
has DTSTART set to 13:30 (and DUE set to 14:30). The idea
here is that since the floor starts accumulating dirt right
after 14:30, obviously it is overdue at 16:00 Tuesday the 7th.
2B) Floor cleaning was procrastinated with one day and
finished Wednesday at 14:30. Next instance will be Wednesday
in a week, at 14:30.
2C) Floor cleaning was procrastinated with two weeks and
finished Tuesday the 14th at 14:30. Next instance will be
Tuesday the 21st at 14:30.
While scenario 2 is the most trivial to implement, it may not
be the correct understanding of the RFC, and it may be tricky
to get the RECURRENCE-ID set correctly.
"""
if not i:
i = self.icalendar_component
if not rrule:
rrule = i["RRULE"]
if not dtstart:
if by is True or (
by is None and any((x for x in rrule if x.startswith("BY")))
):
if "DTSTART" in i:
dtstart = i["DTSTART"].dt
else:
dtstart = ts or datetime.now()
else:
dtstart = ts or datetime.now() - self._get_duration(i)
## dtstart should be compared to the completion timestamp, which
## is set in UTC in the complete() method. However, dtstart
## may be a naïve or a floating timestamp
## (TODO: what if it's a date?)
## (TODO: we need test code for those corner cases!)
if hasattr(dtstart, "astimezone"):
dtstart = dtstart.astimezone(timezone.utc)
if not ts:
ts = dtstart
## Counting is taken care of other places
if no_count and "COUNT" in rrule:
rrule = rrule.copy()
rrule.pop("COUNT")
rrule = rrulestr(rrule.to_ical().decode("utf-8"), dtstart=dtstart)
return rrule.after(ts)
def _reduce_count(self, i=None):
if not i:
i = self.icalendar_component
if "COUNT" in i["RRULE"]:
if i["RRULE"]["COUNT"][0] == 1:
return False
i["RRULE"]["COUNT"][0] -= 1
return True
def _complete_recurring_safe(self, completion_timestamp):
"""This mode will create a new independent task which is
marked as completed, and modify the existing recurring task.
It is probably the most safe way to handle the completion of a
recurrence of a recurring task, though the link between the
completed task and the original task is lost.
"""
## If count is one, then it is not really recurring
if not self._reduce_count():
return self.complete(handle_rrule=False)
next_dtstart = self._next(completion_timestamp)
if not next_dtstart:
return self.complete(handle_rrule=False)
completed = self.copy()
completed.url = self.parent.url.join(completed.id + ".ics")
completed.icalendar_component.pop("RRULE")
completed.save()
completed.complete()
duration = self.get_duration()
i = self.icalendar_component
i.pop("DTSTART", None)
i.add("DTSTART", next_dtstart)
self.set_duration(duration, movable_attr="DUE")
self.save()
def _complete_recurring_thisandfuture(self, completion_timestamp):
"""The RFC is not much helpful, a lot of guesswork is needed
to consider what the "right thing" to do wrg of a completion of
recurring tasks is ... but this is my shot at it.
1) The original, with rrule, will be kept as it is. The rrule
string is fetched from the first subcomponent of the
icalendar.
2) If there are multiple recurrence instances in subcomponents
and the last one is marked with RANGE=THISANDFUTURE, then
select this one. If it has the rrule property set, use this
rrule rather than the original one. Drop the RANGE parameter.
Calculate the next RECURRENCE-ID from the DTSTART of this
object. Mark task as completed. Increase SEQUENCE.
3) Create a new recurrence instance with RANGE=THISANDFUTURE,
without RRULE set (Ref
https://github.com/Kozea/Radicale/issues/1264). Set the
RECURRENCE-ID to the one calculated in #2. Calculate the
DTSTART based on rrule and completion timestamp/date.
"""
recurrences = self.icalendar_instance.subcomponents
orig = recurrences[0]
if not "STATUS" in orig:
orig["STATUS"] = "NEEDS-ACTION"
if len(recurrences) == 1:
## We copy the original one
just_completed = orig.copy()
just_completed.pop("RRULE")
just_completed.add(
"RECURRENCE-ID", orig.get("DTSTART", completion_timestamp)
)
seqno = just_completed.pop("SEQUENCE", 0)
just_completed.add("SEQUENCE", seqno + 1)
recurrences.append(just_completed)
prev = recurrences[-1]
rrule = prev.get("RRULE", orig["RRULE"])
thisandfuture = prev.copy()
seqno = thisandfuture.pop("SEQUENCE", 0)
thisandfuture.add("SEQUENCE", seqno + 1)
## If we have multiple recurrences, assume the last one is a THISANDFUTURE.
## (Otherwise, the data is coming from another client ...)
## The RANGE parameter needs to be removed
if len(recurrences) > 2:
if prev["RECURRENCE-ID"].params.get("RANGE", None) == "THISANDFUTURE":
prev["RECURRENCE-ID"].params.pop("RANGE")
else:
raise NotImplementedError(
"multiple instances found, but last one is not of type THISANDFUTURE, possibly this has been created by some incompatible client, but we should deal with it"
)
self._complete_ical(prev, completion_timestamp)
thisandfuture.pop("RECURRENCE-ID", None)
thisandfuture.add("RECURRENCE-ID", self._next(i=prev, rrule=rrule))
thisandfuture["RECURRENCE-ID"].params["RANGE"] = "THISANDFUTURE"
rrule2 = thisandfuture.pop("RRULE", None)
## Counting logic
if rrule2 is not None:
count = rrule2.get("COUNT", None)
if count is not None and count[0] in (0, 1):
for i in recurrences:
self._complete_ical(i, completion_timestamp=completion_timestamp)
thisandfuture.add("RRULE", rrule2)
else:
count = rrule.get("COUNT", None)
if count is not None and count[0] <= len(
[x for x in recurrences if not self._is_pending(x)]
):
self._complete_ical(
recurrences[0], completion_timestamp=completion_timestamp
)
self.save(increase_seqno=False)
return
rrule = rrule2 or rrule
duration = self._get_duration(i=prev)
thisandfuture.pop("DTSTART", None)
thisandfuture.pop("DUE", None)
next_dtstart = self._next(i=prev, rrule=rrule, ts=completion_timestamp)
thisandfuture.add("DTSTART", next_dtstart)
self._set_duration(i=thisandfuture, duration=duration, movable_attr="DUE")
self.icalendar_instance.subcomponents.append(thisandfuture)
self.save(increase_seqno=False)
def complete(
self, completion_timestamp=None, handle_rrule=False, rrule_mode="safe"
):
"""Marks the task as completed.
Parameters:
* completion_timestamp - datetime object. Defaults to
datetime.now().
* handle_rrule - if set to True, the library will try to be smart if
the task is recurring. The default is False, for backward
compatibility. I may consider making this one mandatory.
* rrule_mode - The RFC leaves a lot of room for interpretation on how
to handle recurring tasks, and what works on one server may break at
another. The following modes are accepted:
* this_and_future - see doc for _complete_recurring_thisandfuture for details
* safe - see doc for _complete_recurring_safe for details
"""
if not completion_timestamp:
completion_timestamp = datetime.utcnow().astimezone(timezone.utc)
if "RRULE" in self.icalendar_component and handle_rrule:
return getattr(self, "_complete_recurring_%s" % rrule_mode)(
completion_timestamp
)
self._complete_ical(completion_timestamp=completion_timestamp)
self.save()
def _complete_ical(self, i=None, completion_timestamp=None):
## my idea was to let self.complete call this one ... but self.complete
## should use vobject and not icalendar library due to backward compatibility.
if i is None:
i = self.icalendar_component
assert self._is_pending(i)
status = i.pop("STATUS", None)
i.add("STATUS", "COMPLETED")
i.add("COMPLETED", completion_timestamp)
def _is_pending(self, i=None):
if i is None:
i = self.icalendar_component
if i.get("COMPLETED", None) is not None:
return False
if i.get("STATUS", None) in ("NEEDS-ACTION", "IN-PROCESS"):
return True
if i.get("STATUS", None) in ("CANCELLED", "COMPLETED"):
return False
if not "STATUS" in i:
return True
## input data does not conform to the RFC
assert False
def uncomplete(self):
"""Undo completion - marks a completed task as not completed"""
### TODO: needs test code for code coverage!
## (it has been tested through the calendar-cli test code)
if not hasattr(self.vobject_instance.vtodo, "status"):
self.vobject_instance.vtodo.add("status")
self.vobject_instance.vtodo.status.value = "NEEDS-ACTION"
if hasattr(self.vobject_instance.vtodo, "completed"):
self.vobject_instance.vtodo.remove(self.vobject_instance.vtodo.completed)
self.save()
## TODO: should be moved up to the base class
def set_duration(self, duration, movable_attr="DTSTART"):
"""
If DTSTART and DUE/DTEND is already set, one of them should be moved. Which one? I believe that for EVENTS, the DTSTART should remain constant and DTEND should be moved, but for a task, I think the due date may be a hard deadline, hence by default we'll move DTSTART.
TODO: can this be written in a better/shorter way?
WARNING: this method is likely to be deprecated and moved to
the icalendar library. If you decide to use it, please put
caldav<2.0 in the requirements.
"""
i = self.icalendar_component
return self._set_duration(i, duration, movable_attr)
def _set_duration(self, i, duration, movable_attr="DTSTART"):
if ("DUE" in i or "DURATION" in i) and "DTSTART" in i:
i.pop(movable_attr, None)
if movable_attr == "DUE":
i.pop("DURATION", None)
if movable_attr == "DTSTART":
i.add("DTSTART", i["DUE"].dt - duration)
elif movable_attr == "DUE":
i.add("DUE", i["DTSTART"].dt + duration)
elif "DUE" in i:
i.add("DTSTART", i["DUE"].dt - duration)
elif "DTSTART" in i:
i.add("DUE", i["DTSTART"].dt + duration)
else:
if "DURATION" in i:
i.pop("DURATION")
i.add("DURATION", duration)
def set_due(self, due, move_dtstart=False, check_dependent=False):
"""The RFC specifies that a VTODO cannot have both due and
duration, so when setting due, the duration field must be
evicted
check_dependent=True will raise some error if there exists a
parent calendar component (through RELATED-TO), and the parents
due or dtend is before the new dtend).
WARNING: this method is likely to be deprecated and parts of
it moved to the icalendar library. If you decide to use it,
please put caldav<2.0 in the requirements.
WARNING: the check_dependent-logic may be rewritten to support
RFC9253 in 1.x already
"""
if hasattr(due, "tzinfo") and not due.tzinfo:
due = due.astimezone(timezone.utc)
i = self.icalendar_component
if check_dependent:
parents = self.get_relatives({"PARENT"})
for parent in parents["PARENT"]:
pend = parent.get_dtend()
if pend and pend.astimezone(timezone.utc) < due:
if check_dependent == "return":
return parent
raise error.ConsistencyError(
"parent object has due/end %s, cannot procrastinate child object without first procrastinating parent object"
)
duration = self.get_duration()
i.pop("DURATION", None)
i.pop("DUE", None)
if move_dtstart and duration and "DTSTART" in i:
i.pop("DTSTART")
i.add("DTSTART", due - duration)
i.add("DUE", due)