from __future__ import annotations
__all__ = [
'MSGFile',
]
import codecs
import copy
import datetime
import functools
import io
import logging
import os
import pathlib
import weakref
import zipfile
import olefile
from typing import (
Any, Callable, cast, Dict, List, Optional, Tuple, TypeVar, Union
)
from .. import constants
from ..constants import (
DATE_FORMAT, DT_FORMAT, MSG_PATH, OVERRIDE_CLASS, ps, SAVE_TYPE
)
from ..attachments import (
AttachmentBase, initStandardAttachment, SignedAttachment
)
from ..encoding import lookupCodePage
from ..enums import (
ErrorBehavior, InsecureFeatures, Importance, Priority, PropertiesType,
RetentionFlags, SaveType, Sensitivity, SideEffect
)
from ..exceptions import (
ConversionError, InvalidFileFormatError, PrefixError,
StandardViolationError
)
from ..properties.named import Named, NamedProperties
from ..properties.properties_store import PropertiesStore
from ..structures.contact_link_entry import ContactLinkEntry
from ..utils import (
divide, guessEncoding, inputToMsgPath, makeWeakRef, msgPathToString,
parseType, verifyPropertyId, verifyType
)
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
_T = TypeVar('_T')
[docs]class MSGFile:
"""
Base handler for all .msg files.
"""
filename: Optional[str]
def __init__(self, path, **kwargs):
"""
:param path: Path to the MSG file in the system or the bytes of the MSG
file.
:param prefix: Used for extracting embedded MSG files inside the main
one. Do not set manually unless you know what you are doing.
:param parentMsg: Used for synchronizing instances of shared objects. Do
not set this unless you know what you are doing.
:param initAttachment: Optional, the method used when creating an
attachment for an MSG file. MUST be a function that takes 2
arguments (the ``MSGFile`` instance and the directory in the MSG
file where the attachment is) and returns an instance of
``AttachmentBase``.
:param delayAttachments: Optional, delays the initialization of
attachments until the user attempts to retrieve them. Allows MSG
files with bad attachments to be initialized so the other data can
be retrieved.
:param filename: Optional, the filename to be used by default when
saving.
:param errorBehavior: Optional, the behavior to use in the event of
certain types of errors. Uses the ``ErrorBehavior`` enum.
:param overrideEncoding: Optional, an encoding to use instead of the one
specified by the MSG file. If the value is ``"chardet"`` and you
have the ``chardet`` module installed, an attempt will be made to
auto-detect the encoding based on some of the string properties. Do
not report encoding errors caused by this.
:param treePath: Internal variable used for giving representation of the
path, as a tuple of objects, of the ``MSGFile``. When passing, this
is the path to the parent object of this instance.
:param insecureFeatures: Optional, an enum value that specifies if
certain insecure features should be enabled. These features should
only be used on data that you trust. Uses the ``InsecureFeatures``
enum.
:param dateFormat: Optional, the format string to use for dates.
:param datetimeFormat: Optional, the format string to use for dates
that include a time component.
:raises InvalidFileFormatError: The file is not an OLE file or could
not be parsed as an MSG file.
:raises StandardViolationError: Some part of the file badly violates the
standard.
:raises IOError: There is an issue opening the MSG file.
:raises NameError: The encoding provided is not supported.
:raises PrefixError: The prefix is not a supported type.
:raises TypeError: The parent is not an instance of ``MSGFile`` or a
subclass.
:raises ValueError: The path is invalid.
It's recommended to check the error message to ensure you know why a
specific exception was raised.
"""
# Retrieve all the kwargs that we need.
self.__inscFeat: InsecureFeatures = kwargs.get('insecureFeatures', InsecureFeatures.NONE)
prefix: str = cast(str, kwargs.get('prefix', ''))
self.__parentMsg = makeWeakRef(cast(MSGFile, kwargs.get('parentMsg')))
self.__treePath = kwargs.get('treePath', []) + [weakref.ref(self)]
# Verify it is a valid class.
if self.__parentMsg and not isinstance(self.__parentMsg(), MSGFile):
raise TypeError(':param parentMsg: must be an instance of MSGFile or a subclass.')
filename = kwargs.get('filename')
overrideEncoding = kwargs.get('overrideEncoding')
# WARNING DO NOT MANUALLY MODIFY PREFIX. Let the program set it.
self.__path = path
self.__initAttachmentFunc = kwargs.get('initAttachment', initStandardAttachment)
self.__attachmentsDelayed = bool(kwargs.get('delayAttachments', False))
self.__attachmentsReady = False
self.__errorBehavior = ErrorBehavior(kwargs.get('errorBehavior', ErrorBehavior.THROW))
self.__dateFormat = kwargs.get('dateFormat', DATE_FORMAT)
self.__dtFormat = kwargs.get('datetimeFormat', DT_FORMAT)
self.__listDirRes: Dict[Tuple[bool, bool, bool], List[List[str]]] = {}
if self.__parentMsg:
# We should be able to directly access the private variables of
# another instance with no issue.
if (msg := self.__parentMsg()) is not None:
self.__ole = msg.__ole
self.__oleOwner = False
else:
raise ReferenceError('Parent MSG was garbage collected during init of child msg.')
else:
# Verify the path at least evaluates to True, as not doing so can
# allow an OleFile to be created without a path.
if not path:
raise ValueError(':param path: must be set and must not be empty.')
try:
if ErrorBehavior.OLE_DEFECT_INCORRECT in self.errorBehavior:
defect = olefile.DEFECT_FATAL
else:
defect = olefile.DEFECT_INCORRECT
self.__ole = olefile.OleFileIO(path, raise_defects = defect)
except OSError as e:
logger.error(e)
if str(e) == 'not an OLE2 structured storage file':
raise InvalidFileFormatError(e)
else:
raise
# This is a variable that tells whether we own the olefile. Used for
# closing. We set it here for error handling.
self.__oleOwner = True
self.__open = True
# The rest *must* be in a try-except block to ensure we close the file.
try:
kwargsCopy = copy.copy(kwargs)
if 'prefix' in kwargsCopy:
del kwargsCopy['prefix']
if 'parentMsg' in kwargsCopy:
del kwargsCopy['parentMsg']
if 'filename' in kwargsCopy:
del kwargsCopy['filename']
if 'treePath' in kwargsCopy:
del kwargsCopy['treePath']
self.__kwargs = kwargsCopy
prefixl = []
if prefix:
try:
prefixl = inputToMsgPath(prefix)
prefix = '/'.join(prefixl) + '/'
except ConversionError:
raise PrefixError(f'The provided prefix could not be used: {prefix}')
self.__prefix = prefix
self.__prefixList = prefixl
self.__prefixLen = len(prefixl)
if overrideEncoding is not None:
logger.warning('You have chosen to override the string encoding. Do not report encoding errors caused by this.')
if overrideEncoding.lower() == 'chardet':
encoding = guessEncoding(self)
if encoding:
self.__stringEncoding = encoding.lower()
else:
logger.warning('Attempted to auto-detect encoding, but no consensus could be formed based on the top-level strings. Defaulting to normal detection methods.')
else:
codecs.lookup(overrideEncoding)
self.__stringEncoding = overrideEncoding
self.__overrideEncoding = overrideEncoding
if prefix and not filename:
filename = self.getStringStream(prefixl[:-1] + ['__substg1.0_3001'], prefix = False)
if filename:
self.filename = filename
elif hasattr(path, '__len__'):
if len(path) < 1536:
self.filename = str(path)
else:
self.filename = None
elif isinstance(path, pathlib.Path):
self.filename = str(path)
else:
self.filename = None
# Now, load the attachments if we are not delaying them.
if not self.__attachmentsDelayed:
self.attachments
except:
# *Any* exception here requires that we close the file.
try:
self.close()
except:
pass
# Raise the exception after trying to close the file.
raise
def __bytes__(self) -> bytes:
return self.exportBytes()
def __enter__(self) -> MSGFile:
self.__ole.__enter__()
return self
def __exit__(self, *_) -> None:
self.close()
def _getOleEntry(self, filename: MSG_PATH, prefix: bool = True) -> olefile.olefile.OleDirectoryEntry:
"""
Finds the directory entry from the OLE file for the stream or storage
specified.
Use ``'/'`` to get the root entry.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
sid = -1
if filename == '/':
if prefix and self.__prefix:
sid = self.__ole._find(self.__prefixList)
else:
return self.__ole.direntries[0]
else:
sid = self.__ole._find(self.fixPath(filename, prefix))
return self.__ole.direntries[sid]
def _getTypedAs(self, _id: str, overrideClass = None, preserveNone: bool = True):
"""
Like the other "get as" functions, but designed for when something
could be multiple types (where only one will be present).
This way you have no need to set the type, it will be handled for you.
:param overrideClass: Class/function to use to morph the data that was
read. The data will be the first argument to the class's
``__init__`` method or the function itself, if that is what is
provided. By default, this will be completely ignored if the value was not found.
:param preserveNone: If ``True`` (default), causes the function to
ignore :param overrideClass: when the value could not be found (is
``None``). If this is changed to ``False``, then the value will be
used regardless.
"""
value = self._getTypedData(_id)
# Check if we should be overriding the data type for this instance.
if overrideClass is not None:
if value is not None or not preserveNone:
value = overrideClass(value)
return value
def _getTypedData(self, _id: str, _type = None, prefix: bool = True):
"""
Gets the data for the specified id as the type that it is supposed to
be.
:param _id: MUST be a 4 digit hexadecimal string.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
If you know for sure what type the data is before hand, you can specify
it as being one of the strings in the constant
``FIXED_LENGTH_PROPS_STRING`` or ``VARIABLE_LENGTH_PROPS_STRING``.
"""
verifyPropertyId(_id)
_id = _id.upper()
found, result = self._getTypedStream('__substg1.0_' + _id, prefix, _type)
if found:
return result
else:
found, result = self._getTypedProperty(_id, _type)
return result if found else None
def _getTypedProperty(self, propertyID: str, _type = None) -> Tuple[bool, Optional[Any]]:
"""
Gets the property with the specified id as the type that it is supposed
to be.
:param propertyID: MUST be a 4 digit hexadecimal string.
If you know for sure what type the property is before hand, you can
specify it as being one of the strings in the constant
``FIXED_LENGTH_PROPS_STRING`` or ``VARIABLE_LENGTH_PROPS_STRING``.
"""
verifyPropertyId(propertyID)
if _type:
verifyType(_type)
propertyID += _type
notFound = object()
ret = self.getPropertyVal(propertyID, notFound)
if ret is notFound:
return False, None
return True, ret
def _getTypedStream(self, filename: MSG_PATH, prefix: bool = True, _type: Optional[str] = None) -> Tuple[bool, Optional[Any]]:
"""
Gets the contents of the specified stream as the type that it is
supposed to be.
Rather than the full filename, you should only feed this function the
filename sans the type. So if the full name is "__substg1.0_001A001F",
the filename this function should receive should be "__substg1.0_001A".
If you know for sure what type the stream is before hand, you can
specify it as being one of the strings in the constant
``FIXED_LENGTH_PROPS_STRING`` or ``VARIABLE_LENGTH_PROPS_STRING``.
If you have not specified the type, the type this function returns in
many cases cannot be predicted. As such, when using this function it is
best for you to check the type that it returns. If the function returns
None, that means it could not find the stream specified.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
verifyType(_type)
filename = self.fixPath(filename, prefix)
for x in (filename + _type,) if _type is not None else self.slistDir():
if x.startswith(filename) and '-' not in x:
if (contents := self.getStream(x, False)) is None:
continue
if len(contents) == 0:
return True, None # We found the file, but it was empty.
extras: List[bytes]= []
_type = x[-4:]
if x[-4] == '1': # It's a multiple
if _type in ('101F', '101E'):
streams = len(contents) // 4 # These lengths are normal.
elif _type == '1102':
streams = len(contents) // 8 # These lengths have 4 0x00 bytes at the end for seemingly no reason. They are "reserved" bytes
elif _type in ('1002', '1003', '1004', '1005', '1007', '1014', '1040', '1048'):
try:
streams = self.props[x[-8:]].realLength
except (KeyError, AttributeError):
logger.error(f'Could not find matching VariableLengthProp for stream {x}')
streams = len(contents) // (2 if _type in constants.MULTIPLE_2_BYTES else 4 if _type in constants.MULTIPLE_4_BYTES else 8 if _type in constants.MULTIPLE_8_BYTES else 16)
else:
raise NotImplementedError(f'The stream specified is of type {_type}. We don\'t currently understand exactly how this type works. If it is mandatory that you have the contents of this stream, please create an issue labeled "NotImplementedError: _getTypedStream {_type}".')
if _type in ('101F', '101E', '1102'):
if self.exists(x + '-00000000', False):
for y in range(streams):
if self.exists((name := f'{x}-{y:08X}'), False):
extras.append(self.getStream(name, False))
elif _type in ('1002', '1003', '1004', '1005', '1007', '1014', '1040', '1048'):
extras = divide(contents, (2 if _type in constants.MULTIPLE_2_BYTES else 4 if _type in constants.MULTIPLE_4_BYTES else 8 if _type in constants.MULTIPLE_8_BYTES else 16))
contents = streams
return True, parseType(int(_type, 16), contents, self.stringEncoding, extras)
return False, None # We didn't find the stream.
def _oleListDir(self, streams: bool = True, storages: bool = False) -> List[List[str]]:
"""
Calls :meth:`OleFileIO.listdir` from the OleFileIO instance associated
with this MSG file. Useful for if you need access to all the top level
streams if this is an embedded MSG file.
:returns: A list of the streams and or storages depending on the
arguments given.
"""
return self.__ole.listdir(streams, storages)
[docs] def close(self) -> None:
if self.__open:
if self.attachmentsReady:
for attachment in self.attachments:
if attachment.type == 'msg':
attachment.data.close()
if self.__oleOwner:
self.__ole.close()
self.__open = False
[docs] def debug(self) -> None:
for dir_ in self.listDir():
if dir_[-1].endswith('001E') or dir_[-1].endswith('001F'):
print('Directory: ' + str(dir_[:-1]))
print(f'Contents: {self.getStream(dir_)}')
[docs] def exists(self, filename: MSG_PATH, prefix: bool = True) -> bool:
"""
Checks if the stream exists in the MSG file.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix)
return self.__ole.exists(filename)
[docs] def sExists(self, filename: MSG_PATH, prefix: bool = True) -> bool:
"""
Checks if string stream exists in the MSG file.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix)
return self.exists(filename + '001F') or self.exists(filename + '001E')
[docs] def existsTypedProperty(self, _id: str, location = None, _type = None, prefix: bool = True, propertiesInstance: Optional[PropertiesStore] = None) -> Tuple[bool, int]:
"""
Determines if the stream with the provided id exists in the location
specified.
If no location is specified, the root directory is searched. The return of this function is 2 values, the first being a boolean for if anything
was found, and the second being how many were found.
Because of how this method works, any folder that contains it's own
"__properties_version1.0" file should have this method called from
it's class.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
verifyPropertyId(_id)
verifyType(_type)
_id = _id.upper()
if propertiesInstance is None:
propertiesInstance = self.props
prefixList = self.prefixList if prefix else []
if location is not None:
prefixList.append(location)
prefixList = inputToMsgPath(prefixList)
usableId = _id + _type if _type else _id
foundNumber = 0
foundStreams = []
for item in self.listDir():
if len(item) > self.__prefixLen:
if item[self.__prefixLen].startswith('__substg1.0_' + usableId) and item[self.__prefixLen] not in foundStreams:
foundNumber += 1
foundStreams.append(item[self.__prefixLen])
for x in propertiesInstance:
if x.startswith(usableId):
for y in foundStreams:
if y.endswith(x):
break
else:
foundNumber += 1
return (foundNumber > 0), foundNumber
[docs] def export(self, path) -> None:
"""
Exports the contents of this MSG file to a new MSG files specified by
the path given.
If this is an embedded MSG file, the embedded streams and directories will be added to it as if they were at the root, allowing you to save it as it's own MSG file.
This function pulls directly from the source MSG file, so modifications
to the properties of of an ``MSGFile`` object (or one of it's
subclasses) will not be reflected in the saved file.
:param path: A path-like object (including strings and ``pathlib.Path``
objects) or an IO device with a write method which accepts bytes.
"""
from ..ole_writer import OleWriter
# Create an instance of the class used for writing a new OLE file.
writer = OleWriter()
# Add all file and directory entries to it. If this
writer.fromMsg(self)
writer.write(path)
[docs] def exportBytes(self) -> bytes:
"""
Saves a new copy of the MSG file, returning the bytes.
"""
out = io.BytesIO()
self.export(out)
return out.getvalue()
[docs] def fixPath(self, inp: MSG_PATH, prefix: bool = True) -> str:
"""
Changes paths so that they have the proper prefix (should :param prefix:
be ``True``) and are strings rather than lists or tuples.
"""
inp = msgPathToString(inp)
if prefix:
inp = self.__prefix + inp
return inp
[docs] def getMultipleBinary(self, filename: MSG_PATH, prefix: bool = True) -> Optional[List[bytes]]:
"""
Gets a multiple binary property as a list of ``bytes`` objects.
Like :meth:`getStringStream`, the 4 character type suffix should be
omitted. So if you want the stream "__substg1.0_00011102" then the
filename would simply be "__substg1.0_0001".
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix) + '1102'
multStream = self.getStream(filename)
if multStream is None:
return None
if len(multStream) == 0:
return []
elif len(multStream) & 7 != 0:
raise StandardViolationError(f'Length stream for multiple binary was not a multiple of 8.')
else:
ret = [self.getStream(filename + f'-{x:08X}') for x in range(len(multStream) // 8)]
# We could do more checking here, but we'll just check for None.
if (index := next((x for x in ret if x is None), -1)) != -1:
logger.error('Unable to get the desired number of binary streams for multiple, not all streams were found.')
return ret[:index]
return ret
[docs] def getMultipleString(self, filename: MSG_PATH, prefix: bool = True) -> Optional[List[str]]:
"""
Gets a multiple string property as a list of ``str`` objects.
Like :meth:`getStringStream`, the 4 character type suffix should be
omitted. So if you want the stream "__substg1.0_00011102" then the
filename would simply be "__substg1.0_0001".
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix) + '101F' if self.areStringsUnicode else '101E'
multStream = self.getStream(filename)
if multStream is None:
return []
if len(multStream) == 0:
return []
elif len(multStream) & 3 != 0:
raise StandardViolationError(f'Length stream for multiple string was not a multiple of 4.')
else:
ret = [self.getStream(filename + f'-{x:08X}') for x in range(len(multStream) // 4)]
# We could do more checking here, but we'll just check for None.
for index, item in enumerate(ret):
if item is None:
logger.error('Unable to get the desired number of string streams for multiple, not all streams were found.')
return ret[:index]
# Decode the bytes and remove the null byte.
ret[index] = item.decode(self.stringEncoding)[:-1]
return ret
[docs] def getNamedAs(self, propertyName: str, guid: str, overrideClass: OVERRIDE_CLASS[_T]) -> Optional[_T]:
"""
Returns the named property, setting the class if specified.
:param overrideClass: Class/function to use to morph the data that was
read. The data will be the first argument to the class's
``__init__`` method or the function itself, if that is what is
provided. If the value is ``None``, this function is not called. If
you want it to be called regardless, you should handle the data
directly.
"""
value = self.getNamedProp(propertyName, guid)
if value is not None:
value = overrideClass(value)
return value
[docs] def getNamedProp(self, propertyName: str, guid: str, default: _T = None) -> Union[Any, _T]:
"""
instance.namedProperties.get((propertyName, guid), default)
Can be override to create new behavior.
"""
return self.namedProperties.get((propertyName, guid), default)
[docs] def getPropertyAs(self, propertyName: Union[int, str], overrideClass: OVERRIDE_CLASS[_T]) -> Optional[_T]:
"""
Returns the property, setting the class if found.
:param overrideClass: Class/function to use to morph the data that was
read. The data will be the first argument to the class's
``__init__`` method or the function itself, if that is what is
provided. If the value is ``None``, this function is not called. If
you want it to be called regardless, you should handle the data
directly.
"""
value = self.getPropertyVal(propertyName)
if value is not None:
value = overrideClass(value)
return value
[docs] def getPropertyVal(self, name: Union[int, str], default: _T = None) -> Union[Any, _T]:
"""
instance.props.getValue(name, default)
Can be overridden to create new behavior.
"""
return self.props.getValue(name, default)
[docs] def getSingleOrMultipleBinary(self, filename: MSG_PATH, prefix: bool = True) -> Optional[Union[List[bytes], bytes]]:
"""
Combination of :meth:`getStream` and :meth:`getMultipleBinary`.
Checks to see if a single binary stream exists to return, otherwise
tries to return the multiple binary stream of the same ID.
Like :meth:`getStringStream`, the 4 character type suffix should be
omitted. So if you want the stream "__substg1.0_00010102" then the
filename would simply be "__substg1.0_0001".
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix)
# Check for a single binary stream first.
if (ret := self.getStream(filename + '0102', False)) is not None:
return ret
# Otherwise, we just let the return from `getMultipleBinary` do the
# work.
return self.getMultipleBinary(filename, False)
[docs] def getSingleOrMultipleString(self, filename: MSG_PATH, prefix: bool = True) -> Optional[Union[List[str], str]]:
"""
Combination of :meth:`getStringStream` and :meth:`getMultipleString`.
Checks to see if a single string stream exists to return, otherwise
tries to return the multiple string stream of the same ID.
Like :meth:`getStringStream`, the 4 character type suffix should be
omitted. So if you want the stream "__substg1.0_0001001F" then the
filename would simply be "__substg1.0_0001".
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix)
# Check for a single stribng stream first.
if (ret := self.getStringStream(filename, False)) is not None:
return ret
# Otherwise, we just let the return from `getMultipleString` do the
# work.
return self.getMultipleString(filename, False)
[docs] def getStream(self, filename: MSG_PATH, prefix: bool = True) -> Optional[bytes]:
"""
Gets a binary representation of the requested stream.
This should ALWAYS return a ``bytes`` object if it was found, otherwise
returns ``None``.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix)
if self.exists(filename, False):
with self.__ole.openstream(filename) as stream:
return stream.read() or b''
else:
logger.info(f'Stream "{filename}" was requested but could not be found. Returning `None`.')
return None
[docs] def getStreamAs(self, streamID: MSG_PATH, overrideClass: OVERRIDE_CLASS[_T]) -> Optional[_T]:
"""
Returns the specified stream, modifying it to the specified class if it
is found.
:param overrideClass: Class/function to use to morph the data that was
read. The data will be the first argument to the class's
``__init__`` method or the function itself, if that is what is
provided. If the value is ``None``, this function is not called. If
you want it to be called regardless, you should handle the data
directly.
"""
value = self.getStream(streamID)
if value is not None:
value = overrideClass(value)
return value
[docs] def getStringStream(self, filename: MSG_PATH, prefix: bool = True) -> Optional[str]:
"""
Gets a string representation of the requested stream.
Rather than the full filename, you should only feed this function the
filename sans the type. So if the full name is "__substg1.0_001A001F",
the filename this function should receive should be "__substg1.0_001A".
This should ALWAYS return a string if it was found, otherwise returns
``None``.
:param prefix: Bool, whether to search for the entry at the root of the
MSG file (``False``) or look in the current child MSG file
(``True``). (Default: ``True``)
"""
filename = self.fixPath(filename, prefix)
if self.areStringsUnicode:
tmp = self.getStream(filename + '001F', prefix = False)
else:
tmp = self.getStream(filename + '001E', prefix = False)
return None if tmp is None else tmp.decode(self.stringEncoding)
[docs] def getStringStreamAs(self, streamID: MSG_PATH, overrideClass: OVERRIDE_CLASS[_T]) -> Optional[_T]:
"""
Returns the specified string stream, modifying it to the specified
class if it is found.
:param overrideClass: Class/function to use to morph the data that was
read. The data will be the first argument to the class's
``__init__`` method or the function itself, if that is what is
provided. If the value is ``None``, this function is not called. If
you want it to be called regardless, you should handle the data
directly.
"""
value = self.getStream(streamID)
if value is not None:
value = overrideClass(value)
return value
[docs] def listDir(self, streams: bool = True, storages: bool = False, includePrefix: bool = True) -> List[List[str]]:
"""
Replacement for ``OleFileIO.listdir`` that runs at the current prefix
directory.
:param includePrefix: If ``False``, removes the part of the path that
is the prefix.
"""
# Get the items from OleFileIO.
try:
return self.__listDirRes[(streams, storages, includePrefix)]
except KeyError:
entries = self.__ole.listdir(streams, storages)
if not self.__prefix:
return entries
prefix = self.__prefix.split('/')
if prefix[-1] == '':
prefix.pop()
prefixLength = self.__prefixLen
entries = [x for x in entries if len(x) > prefixLength and x[:prefixLength] == prefix]
if not includePrefix:
entries = [x[prefixLength:] for x in entries]
self.__listDirRes[(streams, storages, includePrefix)] = entries
return entries
[docs] def slistDir(self, streams: bool = True, storages: bool = False, includePrefix: bool = True) -> List[str]:
"""
Replacement for OleFileIO.listdir that runs at the current prefix
directory. Returns a list of strings instead of lists.
"""
return [msgPathToString(x) for x in self.listDir(streams, storages, includePrefix)]
[docs] def save(self, **kwargs) -> SAVE_TYPE:
if kwargs.get('skipNotImplemented', False):
return (SaveType.NONE, None)
raise NotImplementedError(f'Saving is not yet supported for the {self.__class__.__name__} class.')
[docs] def saveAttachments(self, skipHidden: bool = False, **kwargs) -> None:
"""
Saves only attachments in the same folder.
:param skipHidden: If ``True``, skips attachments marked as hidden.
(Default: ``False``)
"""
for attachment in self.attachments:
if not (skipHidden and attachment.hidden):
attachment.save(skipHidden = skipHidden, **kwargs)
[docs] def saveRaw(self, path) -> None:
# Create a 'raw' folder.
path = pathlib.Path(path)
# Make the location.
os.makedirs(path, exist_ok = True)
# Create the zipfile.
path /= 'raw.zip'
if path.exists():
raise FileExistsError(f'File "{path}" already exists.')
with zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) as zfile:
# Loop through all the directories
for dir_ in self.listDir():
sysdir = '/'.join(dir_)
code = dir_[-1][-8:]
if constants.PROPERTIES.get(code):
sysdir += ' - ' + constants.PROPERTIES[code]
# Generate appropriate filename.
if dir_[-1].endswith('001E') or dir_[-1].endswith('001F'):
filename = 'contents.txt'
else:
filename = 'contents.bin'
# Save contents of directory.
with zfile.open(sysdir + '/' + filename, 'w') as f:
data = self.getStream(dir_)
# Specifically check for None. If this is bytes we still
# want to do this line.
if data is not None:
f.write(data)
@functools.cached_property
def areStringsUnicode(self) -> bool:
"""
Whether the strings are Unicode encoded or not.
"""
return (self.getPropertyVal('340D0003', 0) & 0x40000) != 0
@functools.cached_property
def attachments(self) -> Union[List[AttachmentBase], List[SignedAttachment]]:
"""
A list of all attachments.
"""
# Get the attachments.
attachmentDirs = []
for dir_ in self.listDir(False, True, False):
if dir_[0].startswith('__attach') and dir_[0] not in attachmentDirs:
attachmentDirs.append(dir_[0])
attachments = []
for attachmentDir in attachmentDirs:
attachments.append(self.initAttachmentFunc(self, attachmentDir))
self.__attachmentsReady = True
return attachments
@property
def attachmentsDelayed(self) -> bool:
"""
Returns ``True`` if the attachment initialization was delayed.
"""
return self.__attachmentsDelayed
@property
def attachmentsReady(self) -> bool:
"""
Returns ``True`` if the attachments are ready to be used.
"""
return self.__attachmentsReady
@functools.cached_property
def classified(self) -> bool:
"""
Indicates whether the contents of this message are regarded as
classified information.
"""
return bool(self.getNamedProp('85B5', ps.PSETID_COMMON))
@functools.cached_property
def classType(self) -> Optional[str]:
"""
The class type of the MSG file.
"""
return self.getStringStream('__substg1.0_001A')
@functools.cached_property
def commonEnd(self) -> Optional[datetime.datetime]:
"""
The end time for the object.
"""
return self.getNamedProp('8517', ps.PSETID_COMMON)
@functools.cached_property
def commonStart(self) -> Optional[datetime.datetime]:
"""
The start time for the object.
"""
return self.getNamedProp('8516', ps.PSETID_COMMON)
@functools.cached_property
def contactLinkEntry(self) -> Optional[ContactLinkEntry]:
"""
A class that contains the list of Address Book EntryIDs linked to this
Message object.
"""
return self.getNamedAs('8585', ps.PSETID_COMMON, ContactLinkEntry)
@functools.cached_property
def contacts(self) -> Optional[List[str]]:
"""
Contains the display name property of each Address Book EntryID
referenced in the value of the contactLinkEntry property.
"""
return self.getNamedProp('853A', ps.PSETID_COMMON)
@functools.cached_property
def currentVersion(self) -> Optional[int]:
"""
Specifies the build number of the client application that sent the
message.
"""
return self.getNamedProp('8552', ps.PSETID_COMMON)
@functools.cached_property
def currentVersionName(self) -> Optional[str]:
"""
Specifies the name of the client application that sent the message.
"""
return self.getNamedProp('8554', ps.PSETID_COMMON)
@property
def dateFormat(self) -> str:
"""
The format string to use when converting dates to strings.
This is used for dates with no time component.
"""
return self.__dateFormat
@property
def datetimeFormat(self) -> str:
"""
The format string to use when converting datetimes to strings.
This is used for dates that have time components.
"""
return self.__dtFormat
@property
def errorBehavior(self) -> ErrorBehavior:
"""
The behavior to follow when certain errors occur.
Will be an instance of the ErrorBehavior enum.
"""
return self.__errorBehavior
@functools.cached_property
def importance(self) -> Optional[Importance]:
"""
The specified importance of the MSG file.
"""
return self.getPropertyAs('00170003', Importance)
@property
def importanceString(self) -> Union[str, None]:
"""
The importance string to use for saving.
If the importance is medium then it returns ``None``. Mainly used for
saving.
"""
return {
Importance.HIGH: 'High',
Importance.MEDIUM: None,
Importance.LOW: 'Low',
None: None,
}[self.importance]
@property
def initAttachmentFunc(self) -> Callable[[MSGFile, str], AttachmentBase]:
"""
The method for initializing attachments being used, should you need to
use it externally for whatever reason.
"""
return self.__initAttachmentFunc
@property
def insecureFeatures(self) -> InsecureFeatures:
"""
An enum specifying what insecure features have been enabled for this
file.
"""
return self.__inscFeat
@property
def kwargs(self) -> Dict[str, Any]:
"""
The kwargs used to initialize this message, excluding the prefix.
This is used for initializing embedded MSG files.
"""
return self.__kwargs
@functools.cached_property
def named(self) -> Named:
"""
The main named properties storage.
This is not usable to access the data of the properties directly.
:raises ReferenceError: The parent ``MSGFile`` instance has been garbage
collected.
"""
# Handle the parent MSG file existing.
if self.__parentMsg:
# Try to get the named properties and use that for our main
# instance.
if (msg := self.__parentMsg()) is None:
raise ReferenceError('Parent MSGFile instance has been garbage collected.')
return msg.named
else:
return Named(self)
@functools.cached_property
def namedProperties(self) -> NamedProperties:
"""
The NamedProperties instances usable to access the data for named
properties.
"""
return NamedProperties(self.named, self)
@property
def overrideEncoding(self) -> Optional[str]:
"""
``None`` if the encoding has not been overridden, otherwise the encoding
used for string streams.
"""
return self.__overrideEncoding
@property
def path(self):
"""
The message path if generated from a file, otherwise the data used to
generate the ``MSGFile`` instance.
"""
return self.__path
@property
def prefix(self) -> str:
"""
The prefix of the ``MSGFile`` instance.
Intended for developer use.
"""
return self.__prefix
@property
def prefixLen(self) -> int:
"""
The number of elements in the prefix.
Dividing by 2 will typically tell you how deeply nested the MSG file is.
"""
return self.__prefixLen
@property
def prefixList(self) -> List[str]:
"""
The prefix list of the Message instance.
Intended for developer use.
"""
return copy.deepcopy(self.__prefixList)
@functools.cached_property
def priority(self) -> Optional[Priority]:
"""
The specified priority of the MSG file.
"""
return self.getPropertyAs('00260003', Priority)
@functools.cached_property
def props(self) -> PropertiesStore:
"""
The ``PropertiesStore`` instance used by the ``MSGFile`` instance.
"""
if not (stream := self.getStream('__properties_version1.0')):
if ErrorBehavior.STANDARDS_VIOLATION in self.__errorBehavior:
logger.error('File does not contain a property stream.')
else:
# Raise the exception from None so we don't get all the "during
# the handling of the above exception" stuff.
raise StandardViolationError('File does not contain a property stream.') from None
return PropertiesStore(stream,
PropertiesType.MESSAGE if not self.prefix else PropertiesType.MESSAGE_EMBED)
@functools.cached_property
def retentionDate(self) -> Optional[datetime.datetime]:
"""
The date, in UTC, after which a Message Object is expired by the server.
If ``None``, the Message object never expires.
"""
return self.getPropertyVal('301C0040')
@functools.cached_property
def retentionFlags(self) -> Optional[RetentionFlags]:
"""
Flags that specify the status or nature of an item's retention tag or
archive tag.
"""
return self.getPropertyAs('301D0003', RetentionFlags)
@functools.cached_property
def sensitivity(self) -> Optional[Sensitivity]:
"""
The specified sensitivity of the MSG file.
"""
return self.getPropertyAs('00360003', Sensitivity)
@functools.cached_property
def sideEffects(self) -> Optional[SideEffect]:
"""
Controls how a Message object is handled by the client in relation to
certain user interface actions by the user, such as deleting a message.
"""
return self.getNamedAs('8510', ps.PSETID_COMMON, SideEffect)
@property
def stringEncoding(self) -> str:
try:
return self.__stringEncoding
except AttributeError:
# We need to calculate the encoding.
# Let's first check if the encoding will be Unicode:
if self.areStringsUnicode:
self.__stringEncoding = "utf-16-le"
else:
# Well, it's not Unicode. Now we have to figure out what it IS.
if '3FFD0003' not in self.props:
# If this property is not set by the client, we SHOULD set
# it to ISO-8859-15, but MAY set it to ISO-8859-1.
logger.warning('Encoding property not found. Defaulting to ISO-8859-15.')
self.__stringEncoding = 'iso-8859-15'
else:
enc = cast(int, self.getPropertyVal('3FFD0003'))
# Now we just need to translate that value.
self.__stringEncoding = lookupCodePage(enc)
return self.__stringEncoding
@property
def treePath(self) -> List[weakref.ReferenceType[Any]]:
"""
A path, as a list of weak reference to the instances needed to get to
this instance through the MSGFile-Attachment tree.
These are weak references to ensure the garbage collector doesn't see
the references back to higher objects.
"""
return self.__treePath