from __future__ import annotations
__all__ = [
'PropertiesStore',
]
import copy
import datetime
import logging
import pprint
from typing import (
Any, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union
)
from .. import constants
from ..enums import PropertiesType
from ..exceptions import NotWritableError
from .prop import createProp, FixedLengthProp, PropBase
from ..utils import divide
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
_T = TypeVar('_T')
[docs]class PropertiesStore:
"""
Parser for msg properties files.
"""
def __init__(self, data: Optional[bytes], type_: PropertiesType, writable: bool = False):
"""
Reads a properties stream or creates a brand new ``PropertiesStore``
object.
:param data: The bytes of the properties instance. Setting to ``None``
or empty bytes will cause the properties instance to not be valid
*unless* writable is set to ``True``. If that is the case, the
instance will be setup for creating a new properties stream.
:param type_: The type of properties stream this instance represents.
:param writable: Whether this properties stream should accept
modification.
"""
if not isinstance(type_, PropertiesType):
raise TypeError(':param type_: MUST be a value of PropertiesType.')
self.__type = type_
# Setup early variables.
self.__props: Dict[str, PropBase] = {}
# This maps short IDs to all properties that use that ID. More than one
# property with the same ID but a different type may exist.
self.__idMapping: Dict[str, List[str]] = {}
self.__naid = None
self.__nrid = None
self.__ac = None
self.__rc = None
self.__writable = writable
# Set this now and unset it if everything goes well.
self.__isError = True
# Check if data is None or empty bytes.
if not data:
# Two paths here. If writable, we are just meant to be creating a
# new storage for properties, so initialize and return. Otherwise,
# we're dealing with an error situation, but we want to silence it.
if writable:
self.__rawData = b''
if type_ is not PropertiesType.ATTACHMENT:
self.__naid = 0
self.__nrid = 0
self.__ac = 0
self.__rc = 0
return
if not isinstance(data, bytes):
raise TypeError(':param data: MUST be bytes or None.')
self.__rawData = data
if type_ == PropertiesType.MESSAGE:
skip = 32
self.__nrid, self.__naid, self.__rc, self.__ac = constants.st.ST_PROPSTORE_HEADER.unpack(data[:24])
elif type_ == PropertiesType.MESSAGE_EMBED:
skip = 24
self.__nrid, self.__naid, self.__rc, self.__ac = constants.st.ST_PROPSTORE_HEADER.unpack(data[:24])
else:
skip = 8
streams = divide(self.__rawData[skip:], 16)
for st in streams:
if len(st) == 16:
prop = createProp(st)
self.__props[prop.name] = prop
# Add the ID to our mapping list.
id_ = prop.name[:4]
if id_ not in self.__idMapping:
self.__idMapping[id_] = []
self.__idMapping[id_].append(prop.name)
else:
logger.warning(f'Found stream from divide that was not 16 bytes: {st}. Ignoring.')
self.__isError = False
def __bytes__(self) -> bytes:
return self.toBytes()
def __contains__(self, key: Any) -> bool:
return self.__props.__contains__(key)
def __delitem__(self, key: str) -> None:
"""
Removes an item using the del operator.
:raises KeyError: The key was not found.
:raises TypeError: The key was not a string.
"""
if not isinstance(key, str):
raise TypeError('Del operator can only remove a property by string.')
key = key.upper()
del self.__props[key]
# If the deletion was successful, we need to remove the related ID
# mapping.
shortKey = key[:4]
self.__idMapping[shortKey].remove(key)
if len(self.__idMapping[shortKey]) == 0:
del self.__idMapping[shortKey]
def __getitem__(self, key: Union[str, int]) -> PropBase:
if (found := self._mapId(key)):
return self.__props.__getitem__(found)
raise KeyError(key)
def __iter__(self) -> Iterator[str]:
return self.__props.__iter__()
def __len__(self) -> int:
"""
Returns the number of properties.
"""
return len(self.__props)
def __repr__(self) -> str:
return self.__props.__repr__()
def _mapId(self, id_: Union[int, str]) -> str:
"""
Converts an input into an appropriate property ID.
This is a complex function, allowing the user to specify an int or
string. If the input is a string that is not 4 characters, it is
returned. Otherwise, a series of checks will be
performed. If the input is an int that is less than ``0x10000``, it is
considered a property ID without a type and converted to a 4 character
hexadecimal string. Otherwise, it is converted to an 8 character
hexadecimal string and returned.
Once the input is a 4 character string from the other paths, it will
then be checked against the list of found property IDs, and the first
full ID will be returned.
If a valid conversion could not be done, returns an empty string.
All strings returned will be uppercase.
"""
# See if we need to convert to 4 character string and map or if we just
# need to return quickly.
if isinstance(id_, str):
id_ = id_.upper()
if len(id_) != 4:
return id_
elif isinstance(id_, int):
if id_ >= 0x10000:
return f'{id_:08X}'
else:
id_ = f'{id_:04X}'
else:
return ''
return self.__idMapping.get(id_, ('',))[0]
[docs] def addProperty(self, prop: PropBase, force: bool = False) -> None:
"""
Adds the property if it does not exist.
:param prop: The property to add.
:param force: If ``True``, the writable property will be ignored. This
will not be reflected when converting to ``bytes`` if the instance
is not readable.
:raises KeyError: A property already exists with the chosen name.
:raises NotWritableError: The method was used on an unwritable instance.
"""
if not (force or self.__writable):
raise
if prop.name in self.__props:
raise KeyError('A property with that name already exists.')
self.__props[prop.name.upper()] = prop
self.__idMapping.setdefault(prop.name[:4], list()).append(prop.name.upper())
[docs] def get(self, name: Union[str, int], default: _T = None) -> Union[PropBase, _T]:
"""
Retrieve the property of :param name:.
:returns: The property, or the value of :param default: if the property
could not be found.
"""
if (name := self._mapId(name)):
return self.__props.get(name, default)
else:
return default
[docs] def getProperties(self, id_: Union[str, int]) -> List[PropBase]:
"""
Gets all properties with the specified ID.
:param ID: An 4 digit hexadecimal string or an int that is less than
0x10000.
"""
if isinstance(id_, int):
if id_ >= 0x10000:
return []
else:
id_ = f'{id_:04X}'
elif isinstance(id_, str):
if len(id_) == 4:
id_ = id_.upper()
else:
return []
return [self[x] for x in self.__idMapping.get(id_, [])]
[docs] def getValue(self, name: Union[str, int], default: _T = None) -> Union[Any, _T]:
"""
Attempts to get the first property
"""
if isinstance(name, int):
if name >= 0x10000:
name = f'{name:08X}'
else:
name = f'{name:04X}'
if len(name) == 4:
for prop in self.getProperties(name):
if isinstance(prop, FixedLengthProp):
return prop.value
return default
elif len(name) == 8:
if (prop := self.get(name)):
if isinstance(prop, FixedLengthProp):
return prop.value
else:
return default
return default
else:
raise ValueError('Property name must be an int less than 0x100000000, a 4 character hex string, or an 8 character hex string.')
[docs] def items(self) -> Iterable[Tuple[str, PropBase]]:
return self.__props.items()
[docs] def keys(self) -> Iterable[str]:
return self.__props.keys()
[docs] def makeWritable(self) -> PropertiesStore:
"""
Returns a copy of this PropertiesStore object that allows modification.
If the instance is already writable, this will return the object.
"""
if self.__writable:
return self
return PropertiesStore(self.__rawData, self.__type, True)
[docs] def pprintKeys(self) -> None:
"""
Uses the pprint function on a sorted list of the keys.
"""
pprint.pprint(sorted(self.__props.keys()))
[docs] def removeProperty(self, nameOrProp: Union[str, PropBase]) -> None:
"""
Removes the property by name or by instance.
Due to possible ambiguities, this function does *not* accept an int
argument nor will it be able to find a property based on the 4 character
hex ID.
:raises KeyError: The property was not found.
:raises NotWritableError: The instance is not writable.
:raises TypeError: The type for :param nameOrProp: was wrong.
"""
if isinstance(nameOrProp, str):
del self[nameOrProp]
elif isinstance(nameOrProp, PropBase):
del self[nameOrProp.name]
else:
raise TypeError(f'Cannot remove property using type {type(nameOrProp)}.')
[docs] def toBytes(self) -> bytes:
if self.__writable:
# The reserved field is present on all of them.
ret = b'\x00' * 8
# Add additional fields depending on type.
if self.__type is not PropertiesType.ATTACHMENT:
ret += constants.st.ST_PROPSTORE_HEADER.pack(self.__nrid, self.__naid, self.__rc, self.__ac)
if self.__type is PropertiesType.MESSAGE:
ret += b'\x00' * 8
# Convert all the properties to bytes.
ret += b''.join(bytes(prop) for prop in self.__props.values())
return ret
else:
return self.__rawData
[docs] def values(self) -> Iterable[PropBase]:
return self.__props.values()
items.__doc__ = dict.items.__doc__
keys.__doc__ = dict.keys.__doc__
values.__doc__ = dict.values.__doc__
@property
def attachmentCount(self) -> int:
"""
The number of Attachment objects for the ``MSGFile`` object.
:raises NotWritableError: The setter was used on an unwritable instance.
:raises TypeError: The Properties instance is not for an ``MSGFile``
object.
"""
if self.__ac is None:
raise TypeError('Attachment properties do not contain an attachment count.')
return self.__ac
@attachmentCount.setter
def attachmentCount(self, value: int) -> None:
if not self.__writable:
raise NotWritableError('PropertiesStore object is not writable.')
if not isinstance(value, int):
raise TypeError(':property attachmentCount: must be an int.')
if self.__ac is None:
raise TypeError('Attachment properties do not contain an attachment count.')
self.__ac = value
@property
def date(self) -> Optional[datetime.datetime]:
"""
Returns the send date contained in the Properties file.
"""
try:
return self.__date
except AttributeError:
self.__date = None
if '00390040' in self:
dateValue = self.getValue('00390040')
# A date can be bytes if it fails to initialize, so we check it
# first.
if isinstance(dateValue, datetime.datetime):
self.__date = dateValue
return self.__date
@property
def isError(self) -> bool:
"""
Whether the instance is in an invalid state.
If the instance is not writable and was given no data, this will be
``True``.
"""
return self.__isError
@property
def nextAttachmentId(self) -> int:
"""
The ID to use for naming the next Attachment object storage if one is
created inside the .msg file.
:raises NotWritableError: The setter was used on an unwritable instance.
:raises TypeError: The Properties instance is not for an ``MSGFile``
object.
"""
if self.__naid is None:
raise TypeError('Attachment properties do not contain a next attachment ID.')
return self.__naid
@nextAttachmentId.setter
def nextAttachmentId(self, value: int) -> None:
if not self.__writable:
raise NotWritableError('PropertiesStore object is not writable.')
if not isinstance(value, int):
raise TypeError(':property nextAttachmentId: must be an int.')
if self.__ac is None:
raise TypeError('Attachment properties do not contain a next attachment ID.')
self.__naid = value
@property
def nextRecipientId(self) -> int:
"""
The ID to use for naming the next Recipient object storage if one is
created inside the .msg file.
:raises NotWritableError: The setter was used on an unwritable instance.
:raises TypeError: The Properties instance is not for an ``MSGFile``
object.
"""
if self.__nrid is None:
raise TypeError('Attachment properties do not contain a next recipient ID.')
return self.__nrid
@nextRecipientId.setter
def nextRecipientId(self, value: int) -> None:
if not self.__writable:
raise NotWritableError('PropertiesStore object is not writable.')
if not isinstance(value, int):
raise TypeError(':property nextRecipientId: must be an int.')
if self.__ac is None:
raise TypeError('Attachment properties do not contain a next recipient ID.')
self.__nrid = value
@property
def props(self) -> Dict[str, PropBase]:
"""
Returns a copy of the internal properties dict.
"""
return copy.deepcopy(self.__props)
@property
def recipientCount(self) -> int:
"""
The number of Recipient objects for the ``MSGFile`` object.
:raises NotWritableError: The setter was used on an unwritable instance.
:raises TypeError: The Properties instance is not for an ``MSGFile``
object.
"""
if self.__rc is None:
raise TypeError('Attachment properties do not contain a recipient count.')
return self.__rc
@recipientCount.setter
def recipientCount(self, value: int) -> None:
if not self.__writable:
raise NotWritableError('PropertiesStore object is not writable.')
if not isinstance(value, int):
raise TypeError(':property recipientCount: must be an int.')
if self.__ac is None:
raise TypeError('Attachment properties do not contain a recipient count.')
self.__nrid = value
@property
def writable(self) -> bool:
"""
Whether the instance accepts modification.
"""
return self.__writable