Source code for extract_msg.attachments.signed_att

from __future__ import annotations


__all__ = [
    'SignedAttachment',
]


import email.message
import logging
import os
import pathlib
import weakref
import zipfile

from typing import List, Optional, Type, TYPE_CHECKING, Union

from .. import constants
from ..enums import AttachmentType, SaveType
from ..open_msg import openMsg
from ..utils import createZipOpen, inputToString, makeWeakRef, prepareFilename


# Allow for nice type checking.
if TYPE_CHECKING:
    from ..msg_classes.msg import MSGFile

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())


[docs]class SignedAttachment: def __init__(self, msg, data: bytes, name: str, mimetype: str, node: email.message.Message): """ :param msg: The MSGFile instance this attachment is associated with. :param data: The bytes that compose this attachment. :param name: The reported name of the attachment. :param mimetype: The reported mimetype of the attachment. :param node: The email Message instance for this node. """ self.__asBytes = data self.__name = name self.__mimetype = mimetype self.__msg = makeWeakRef(msg) self.__node = node self.__treePath = msg.treePath + [makeWeakRef(self)] self.__data = None # To add support for embedded MSG files, we are going to completely # ignore the mimetype and just do a few simple checks to see if we can # use the bytes as am embedded file. if data[:8] == b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1': try: # While we have to pass a lot of data down to the file, we don't # pass the prefix and parent MSG data, as it is not an *actual* # embedded MSG file. We are just pretending that it is for the # external API. self.__data = openMsg(data, treePath = self.__treePath, **msg.kwargs) except Exception: logger.exception('Signed message was an OLE file, but could not be read as an MSG file due to an exception.') if self.__data is None: self.__data = data def _handleFnc(self, _zip, filename, customPath: pathlib.Path, kwargs) -> pathlib.Path: """ "Handle Filename Conflict" Internal function for use in determining how to modify the saving path when a file with the same name already exists. This is mainly because any save function that uses files will need to do this functionality. :returns: A ``pathlib.Path`` object to where the file should be saved. """ fullFilename = customPath / filename overwriteExisting = kwargs.get('overwriteExisting', False) if _zip: # If we are writing to a zip file and are not overwriting. if not overwriteExisting: name, ext = os.path.splitext(filename) nameList = _zip.namelist() if str(fullFilename).replace('\\', '/') in nameList: for i in range(2, 100): testName = customPath / f'{name} ({i}){ext}' if str(testName).replace('\\', '/') not in nameList: return testName else: # If we couldn't find one that didn't exist. raise FileExistsError(f'Could not create the specified file because it already exists ("{fullFilename}").') else: if not overwriteExisting and fullFilename.exists(): # Try to split the filename into a name and extension. name, ext = os.path.splitext(filename) # Try to add a number to it so that we can save without overwriting. for i in range(2, 100): testName = customPath / f'{name} ({i}){ext}' if not testName.exists(): return testName else: # If we couldn't find one that didn't exist. raise FileExistsError(f'Could not create the specified file because it already exists ("{fullFilename}").') return fullFilename
[docs] def save(self, **kwargs) -> constants.SAVE_TYPE: """ Saves the attachment data. The name of the file is determined by several factors. The first thing that is checked is if you have provided :param customFilename: to this function. If you have, that is the name that will be used. Otherwise, the name from :attr:`name` will be used. After the name to use has been determined, it will then be shortened to make sure that it is not more than the value of :param maxNameLength:. To change the directory that the attachment is saved to, set the value of :param customPath: when calling this function. The default save directory is the working directory. If you want to save the contents into a ``ZipFile`` or similar object, either pass a path to where you want to create one or pass an instance to :param zip:. If :param zip: is an instance, :param customPath: will refer to a location inside the zip file. """ # First check if we are skipping embedded messages and stop # *immediately* if we are. if self.type is AttachmentType.SIGNED_EMBEDDED and kwargs.get('skipEmbedded'): return (SaveType.NONE, None) # If we are running the save function for the MSG file, just let it # handle everything. if (self.type is AttachmentType.SIGNED_EMBEDDED and not kwargs.get('extractEmbedded', False)): return self.saveEmbededMessage(**kwargs) # Check if the user has specified a custom filename filename = self.name # Someone managed to have a null character here, so let's get rid of that filename = prepareFilename(inputToString(filename, self.msg.stringEncoding)) # Get the maximum name length. maxNameLength = kwargs.get('maxNameLength', 256) # Make sure the filename is not longer than it should be. if len(filename) > maxNameLength: name, ext = os.path.splitext(filename) filename = name[:maxNameLength - len(ext)] + ext # Check if we are doing a zip file. _zip = kwargs.get('zip') createdZip = True try: # ZipFile handling. if _zip: # If we are doing a zip file, first check that we have been # given a path. if isinstance(_zip, (str, pathlib.Path)): # If we have a path then we use the zip file. _zip = zipfile.ZipFile(_zip, 'a', zipfile.ZIP_DEFLATED) kwargs['zip'] = _zip createdZip = True else: createdZip = False # Path needs to be done in a special way if we are in a zip # file. customPath = pathlib.Path(kwargs.get('customPath', '')) # Set the open command to be that of the zip file. _open = createZipOpen(_zip.open) # Zip files use w for writing in binary. mode = 'w' else: customPath = pathlib.Path(kwargs.get('customPath', '.')).absolute() mode = 'wb' _open = open fullFilename = self._handleFnc(_zip, filename, customPath, kwargs) if self.type is AttachmentType.DATA: with _open(str(fullFilename), mode) as f: f.write(self.__data) return (SaveType.FILE, str(fullFilename)) else: with _open(str(fullFilename), mode) as f: # We just use the data we were given for this one. f.write(self.__asBytes) return (SaveType.FILE, str(fullFilename)) finally: # Close the ZipFile if this function created it. if _zip and createdZip: _zip.close()
[docs] def saveEmbededMessage(self, **kwargs) -> constants.SAVE_TYPE: """ Seperate function from save to allow it to easily be overridden by a subclass. """ return self.data.save(**kwargs)
@property def asBytes(self) -> bytes: return self.__asBytes @property def data(self) -> Union[bytes, MSGFile]: """ The bytes that compose this attachment. """ return self.__data @property def dataType(self) -> Optional[Type[type]]: """ The class that the data type will use, if it can be retrieved. This is a safe way to do type checking on data before knowing if it will raise an exception. Returns ``None`` if no data will be returns or if an exception will be raised. """ try: return None if self.data is None else self.data.__class__ except Exception: # All exceptions that accessing data would cause should be silenced. return None @property def emailMessage(self) -> email.message.Message: """ The email Message instance that is the source for this attachment. """ return self.__node @property def mimetype(self) -> str: """ The reported mimetype of the attachment. """ return self.__mimetype @property def msg(self) -> MSGFile: """ The ``MSGFile`` instance this attachment belongs to. :raises ReferenceError: The associated ``MSGFile`` instance has been garbage collected. """ if (msg := self.__msg()) is None: raise ReferenceError('The MSGFile for this Attachment instance has been garbage collected.') return msg @property def name(self) -> str: """ The reported name of this attachment. """ return self.__name longFilename = name shortFilename = name @property def treePath(self) -> List[weakref.ReferenceType]: """ A path, as a tuple of instances, needed to get to this instance through the MSGFile-Attachment tree. """ return self.__treePath @property def type(self) -> AttachmentType: return AttachmentType.SIGNED if isinstance(self.__data, bytes) else AttachmentType.SIGNED_EMBEDDED