Source code for extract_msg.properties.named

from __future__ import annotations


__all__ = [
    'Named',
    'NamedProperties',
    'NamedPropertyBase',
    'NumericalNamedProperty',
    'StringNamedProperty',
]


import abc
import copy
import logging
import pprint
import weakref

from typing import (
        Any, Dict, Iterable, Iterator, List, Optional, Tuple, TYPE_CHECKING,
        TypeVar, Union
    )

from .. import constants
from ..enums import ErrorBehavior, NamedPropertyType
from ..exceptions import InvalidPropertyIdError, StandardViolationError
from ..utils import bytesToGuid, divide, msgPathToString, verifyPropertyId
from compressed_rtf.crc32 import crc32


# Allow for nice type checking.
if TYPE_CHECKING:
    from ..msg_classes.msg import MSGFile
    from ..attachments.attachment_base import AttachmentBase

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

_T = TypeVar('_T')


[docs]class Named: """ Class for handling access to the named properties themselves. """ __dir = '__nameid_version1.0' def __init__(self, msg: MSGFile): self.__msg = weakref.ref(msg) # Get the basic streams. If all are emtpy, then nothing to do. guidStream = self.getStream('__substg1.0_00020102') entryStream = self.getStream('__substg1.0_00030102') self.guidStream = guidStream self.entryStream = entryStream self.namesStream = self.getStream('__substg1.0_00040102') self.__propertiesDict: Dict[Tuple[str, str], NamedPropertyBase] = {} self.__streamIDDict: Dict[str, Tuple[str, str]] = {} if guidStream is None: if ErrorBehavior.STANDARDS_VIOLATION in msg.errorBehavior: logger.warning('Standards Violation: Guid stream missing from named properties.') guidStream = b'' else: raise StandardViolationError('Guid stream missing from named properties.') if entryStream is None: if ErrorBehavior.STANDARDS_VIOLATION in msg.errorBehavior: logger.warning('Standards Violation: Entry stream missing from named properties.') entryStream = b'' else: raise StandardViolationError('Entry stream missing from named properties.') if self.namesStream is None: if ErrorBehavior.STANDARDS_VIOLATION in msg.errorBehavior: logger.error('Standards Violation: Guid stream missing from named properties. Will not parse named properties.') # Return immediately since the entry stream will likely fail. return else: raise StandardViolationError('Guid stream missing from named properties.') # Check that we even have any entries. If there are none, nothing to do. if entryStream: guids = tuple([None, constants.ps.PS_MAPI, constants.ps.PS_PUBLIC_STRINGS] + [bytesToGuid(x) for x in divide(guidStream, 16)]) entries: List[Dict[str, Any]]= [] for rawStream in divide(entryStream, 8): tmp = constants.st.ST_NP_ENT.unpack(rawStream) entry = { 'id': tmp[0], 'pid': tmp[2], 'guid_index': tmp[1] >> 1, 'pkind': NamedPropertyType(tmp[1] & 1), # 0 if numerical, 1 if string. 'rawStream': rawStream, } entry['guid'] = guids[entry['guid_index']] entries.append(entry) properties: List[NamedPropertyBase] = [] for entry in entries: if entry['pkind'] == NamedPropertyType.STRING_NAMED: name = None try: name = self.__getName(entry['id']) except ValueError as e: if ErrorBehavior.NAMED_NAME_STREAM in msg.errorBehavior: logger.error(f'Dropping named property because it failed to acquire name from name stream: {e}') else: raise if name: properties.append(StringNamedProperty(entry, name)) else: properties.append(NumericalNamedProperty(entry)) for property in properties: id_ = property.identifier self.__propertiesDict[id_] = property self.__streamIDDict[property.propertyStreamID] = id_ def __contains__(self, key) -> bool: return key in self.__propertiesDict def __getitem__(self, propertyName: Tuple[str, str]) -> NamedPropertyBase: # Validate the key. if not hasattr(propertyName, '__len__') or len(propertyName) != 2: raise TypeError('Named property key must be a tuple of two strings.') # Case insensitive search of the dictionary. propertyName = (propertyName[0].upper(), propertyName[1].upper()) for key in self.__propertiesDict.keys(): if propertyName == (key[0].upper(), key[1].upper()): return self.__propertiesDict[key] raise KeyError(propertyName) def __iter__(self) -> Iterator[Tuple[str, str]]: return self.__propertiesDict.__iter__() def __len__(self) -> int: return self.__propertiesDict.__len__() def __getName(self, offset: int) -> str: """ Parses the offset into the named stream and returns the name found. """ # We used to parse names by handing it as an array, as specified by the # documentation, but this new method allows for a little bit more wiggle # room in terms of what is accepted by the module. if offset & 3 != 0: # If the offset is not a multiple of 4, that is an error, but we are # reducing it to a warning. logger.warning(f'Malformed named properties detected due to bad offset ({offset}). Ignoring.') # Check that offset is in string stream. if offset > len(self.namesStream): raise ValueError('Failed to parse named property: offset was not in string stream.') # Get the length, in bytes, of the string. length = constants.st.ST_LE_I32.unpack(self.namesStream[offset:offset + 4])[0] offset += 4 # Make sure the string can be read entirely. If it can't, something was # corrupt. if offset + length > len(self.namesStream): raise ValueError(f'Failed to parse named property: length ({length}) of string overflows the string stream. This is probably due to a bad offset.') return self.namesStream[offset:offset + length].decode('utf-16-le')
[docs] def exists(self, filename: constants.MSG_PATH) -> bool: """ Checks if stream exists inside the named properties folder. :raises ReferenceError: The associated ``MSGFile`` instance has been garbage collected. """ if (msg := self.__msg()) is None: raise ReferenceError('The MSGFile for this Named instance has been garbage collected.') return msg.exists([self.__dir, msgPathToString(filename)], False)
[docs] def get(self, propertyName: Tuple[str, str], default: _T = None) -> Union[NamedPropertyBase, _T]: """ Tries to get a named property based on its key. Returns :param default: if not found. Key is a tuple of the name and the property set GUID. """ try: return self[propertyName] except KeyError: return default
[docs] def getPropNameByStreamID(self, streamID: Union[int, str]) -> Optional[Tuple[str, str]]: """ Gets the name of a property (as a key for the internal dict) that is stored in the specified stream. Useful for determining if a stream/property stream entry is a named property. :param streamID: A 4 hex character identifier that will be checked. May also be an integer that can convert to 4 hex characters. :returns: The name, if the stream is a named property, otherwise ``None``. :raises InvalidPropertyIdError: The Stream ID is invalid. :raises TypeError: The Stream ID is not a valid type. """ if isinstance(streamID, int): if streamID < 0x8000: # Definitely doesn't refer to a name property, so just return # None. return None if streamID > 0xFFFF: raise InvalidPropertyIdError('Stream ID is out of range.') streamID = f'{streamID:04X}' elif isinstance(streamID, str): verifyPropertyId(streamID) else: raise TypeError(':param streamID: MUST be an int or str.') return self.__streamIDDict.get(streamID)
[docs] def getStream(self, filename: constants.MSG_PATH) -> Optional[bytes]: """ Gets a binary representation of the requested stream. This should ALWAYS return a ``bytes`` object if it was found, otherwise returns ``None``. :raises ReferenceError: The associated ``MSGFile`` instance has been garbage collected. """ if (msg := self.__msg()) is None: raise ReferenceError('The MSGFile for this Named instance has been garbage collected.') return msg.getStream([self.__dir, msgPathToString(filename)], False)
[docs] def items(self) -> Iterable[Tuple[Tuple[str, str], NamedPropertyBase]]: return self.__propertiesDict.items()
[docs] def keys(self) -> Iterable[Tuple[str, str]]: return self.__propertiesDict.keys()
[docs] def pprintKeys(self) -> None: """ Uses the pprint function on a sorted list of keys. """ pprint.pprint(sorted(self.__propertiesDict.keys()))
[docs] def values(self) -> Iterable[NamedPropertyBase]: return self.__propertiesDict.values()
@property def dir(self): """ Returns the directory inside the MSG file where the named properties are located. """ return self.__dir @property def msg(self) -> MSGFile: """ Returns the Message instance the attachment belongs to. :raises ReferenceError: The associated ``MSGFile`` instance has been garbage collected. """ if (msg := self.__msg()) is None: raise ReferenceError('The MSGFile for this Named instance has been garbage collected.') return msg @property def namedProperties(self) -> Dict[Tuple[str, str], NamedPropertyBase]: """ Returns a copy of the dictionary containing all the named properties. """ return copy.deepcopy(self.__propertiesDict)
[docs]class NamedProperties: """ An instance that uses a Named instance and an extract-msg class to read the data of named properties. """ def __init__(self, named: Named, streamSource: Union[MSGFile, AttachmentBase]): """ :param named: The Named instance to refer to for named properties entries. :param streamSource: The source to use for acquiring the data of a named property. """ self.__named = named self.__streamSource = weakref.ref(streamSource) def __getitem__(self, item: Union[Tuple[str, str], NamedPropertyBase]): """ Get a named property using the [] operator. Item must be a named property instance or a tuple with 2 items: the name and the GUID string. :raises ReferenceError: The associated instance for getting actual property data has been garbage collected. """ if (source := self.__streamSource()) is None: raise ReferenceError('The stream source for the NamedProperties instance has been garbage collected.') if isinstance(item, NamedPropertyBase): return source._getTypedData(item.propertyStreamID) else: return source._getTypedData(self.__named[item].propertyStreamID)
[docs] def get(self, item: Union[Tuple[str, str], NamedPropertyBase], default: _T = None) -> Union[Any, _T]: """ Get a named property, returning the value of :param default: if not found. Item must be a tuple with 2 items: the name and the GUID string. :raises ReferenceError: The associated instance for getting actual property data has been garbage collected. """ try: return self[item] except KeyError: return default
[docs]class NamedPropertyBase(abc.ABC): def __init__(self, entry: Dict[str, Any]): self.__entry = entry self.__guidIndex = entry['guid_index'] self.__namedPropertyID = entry['pid'] self.__guid = entry['guid'] self.__propertyStreamID = f'{0x8000 + self.__namedPropertyID:04X}' @property def guid(self) -> str: """ The guid of the property's property set. """ return self.__guid @property def guidIndex(self) -> int: """ The guid index of the property's property set. """ return self.__guidIndex @property @abc.abstractmethod def identifier(self) -> Tuple[str, str]: """ An identifier that can be used to full identify the property. """ @property def namedPropertyID(self) -> int: """ The named property id. """ return self.__namedPropertyID @property def propertyStreamID(self) -> str: """ An ID usable for grabbing the value stream. """ return self.__propertyStreamID @property def rawEntry(self) -> Dict[str, Any]: return copy.deepcopy(self.__entry) @property def rawEntryStream(self) -> bytes: """ The raw data used for the entry. """ return self.__entry['rawStream'] @property @abc.abstractmethod def type(self) -> NamedPropertyType: """ Returns the type of the named property. This will be a member of the NamedPropertyType enum. """
[docs]class StringNamedProperty(NamedPropertyBase): def __init__(self, entry: Dict, name: str): super().__init__(entry) self.__name = name # Finally got this to be correct after asking about it on a Microsoft # forum. Apparently it uses the same CRC-32 as the Compressed RTF # standard does, so we can just use the function defined in the # compressed-rtf Python module. # # First thing to note is that the name should only ever be lowered if it # is part of the PS_INTERNET_HEADERS property set **AND** it is # generated by certain versions of Outlook. As such, a little bit of # additional code will need to run to determine exactly what the stream # ID should be if it is in that property set. if self.guid == constants.ps.PS_INTERNET_HEADERS: # To be sure if it needs to be lower the most effective method would # be to just get the Stream ID and then check if the entry is in # there. If it isn't, then check the regular case and see. If it is # not in either... well, we don't use it for anything so it will # just be a warning, and the Stream ID will be set to 0. # # TODO: Unfortunately, doing this will need to be put off until a # different version, preferably after Python 2 support is removed, # as this will require restructuring a lot of internal code. For now # we just assume that it is lowercase. self.__streamID = 0x1000 + (crc32(name.lower().encode('utf-16-le')) ^ (self.guidIndex << 1 | 1)) % 0x1F else: # No special logic here to determine what to do. self.__streamID = 0x1000 + (crc32(name.encode('utf-16-le')) ^ (self.guidIndex << 1 | 1)) % 0x1F @property def identifier(self) -> Tuple[str, str]: return (self.name, self.guid) @property def name(self) -> str: """ The name of the property. """ return self.__name @property def streamID(self) -> int: """ Returns the streamID of the named property. This may not be accurate. """ return self.__streamID @property def type(self) -> NamedPropertyType: """ Returns the type of the named property. This will be a member of the NamedPropertyType enum. """ return NamedPropertyType.STRING_NAMED
[docs]class NumericalNamedProperty(NamedPropertyBase): def __init__(self, entry: Dict): super().__init__(entry) self.__propertyID = f'{entry["id"]:04X}' self.__streamID = 0x1000 + (entry['id'] ^ (self.guidIndex << 1)) % 0x1F @property def identifier(self) -> Tuple[str, str]: return (self.propertyID, self.guid) @property def propertyID(self) -> str: """ The actualy property id of the named property. """ return self.__propertyID @property def streamID(self) -> int: """ Returns the streamID of the named property. This may not be accurate. """ return self.__streamID @property def type(self) -> NamedPropertyType: """ Returns the type of the named property. This will be a member of the NamedPropertyType enum. """ return NamedPropertyType.NUMERICAL_NAMED