Source code for extract_msg.msg_classes.message_signed_base

__all__ = [
    'MessageSignedBase',
]


import functools
import html
import logging
import re

from typing import Generic, List, Optional, Type, TypeVar

from ..attachments import AttachmentBase, SignedAttachment
from ..enums import AttachmentType, DeencapType, ErrorBehavior
from ..exceptions import StandardViolationError
from .message_base import MessageBase
from ..utils import inputToBytes, inputToString, unwrapMultipart


logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

_T = TypeVar('_T')


[docs]class MessageSignedBase(MessageBase, Generic[_T]): """ Base class for Message-like MSG files that are signed. """ def __init__(self, path, signedAttachmentClass: Type[_T] = SignedAttachment, **kwargs): """ Supports all of the options from :meth:`MessageBase.__init__` with some additional ones. :param signedAttachmentClass: optional, the class the object will use for signed attachments. """ self.__sAttCls = signedAttachmentClass super().__init__(path, **kwargs) @functools.cached_property def attachments(self) -> List[_T]: """ A list of all attachments. :raises StandardViolationError: The standard for signed messages was blatantly violated. """ # Set these to None to make error handling easier. self._signedBody = None self._signedHtmlBody = None atts = self.rawAttachments if len(atts) != 1: if ErrorBehavior.STANDARDS_VIOLATION in self.errorBehavior: if len(atts) == 0: logger.error('Standards Violation: Signed message has no attachments.') return [] # If there is at least one attachment, log an error about it # and then try to use the first one. logger.error('Standards Violation: Signed message has more than one attachment. Attempting to use first one.') else: raise StandardViolationError('Signed messages without exactly 1 (regular) attachment constitute a violation of the standard.') # If we are here, validate that the found attachment is acceptable. if atts[0].type is not AttachmentType.DATA: # This is unacceptable and the attachment cannot be used. if ErrorBehavior.STANDARDS_VIOLATION in self.errorBehavior: logger.error('Standards Violation: Attachment on signed message is unacceptable (not binary).') return [] raise StandardViolationError('Signed messages *must* have a binary attachment. Signed attachments cannot be parsed otherwise.') # Check the mimetype *directly*. Detection of mimetype through data is # unacceptable for this check. #temp = atts[0].getStringStream('__substg1.0_370E') #if temp == 'multipart/signed': ## Do nothing, but this is how we detect that it is clear signed. #pass #else: ## Do nothing, but this is how we detect that it is *not* clear # signed. #pass # We need to unwrap the multipart stream. unwrapped = unwrapMultipart(atts[0].data) # Now store everything where it needs to be and make the # attachments. self._signedBody = unwrapped['plain_body'] self._signedHtmlBody = inputToBytes(unwrapped['html_body'], 'utf-8') return [self.__sAttCls(self, **att) for att in unwrapped['attachments']] @functools.cached_property def body(self) -> Optional[str]: """ The message body, if it exists. """ if (body := self.getStringStream('__substg1.0_1000')) is not None: pass elif self.signedBody: body = self.signedBody elif self.rtfBody: # If the body doesn't exist, see if we can get it from the RTF # body. body = self.deencapsulateBody(self.rtfBody, DeencapType.PLAIN) if body: body = inputToString(body, 'utf-8') if re.search('\n', body): if re.search('\r\n', body): self._crlf = '\r\n' return body @functools.cached_property def htmlBody(self) -> Optional[bytes]: """ The html body, if it exists. """ if (htmlBody := self.getStream('__substg1.0_10130102')) is not None: pass elif self.signedHtmlBody: htmlBody = self.signedHtmlBody elif self.rtfBody: logger.info('HTML body was not found, attempting to generate from RTF.') htmlBody = self.deencapsulateBody(self.rtfBody, DeencapType.HTML) elif self.body: # Convert the plain text body to html. logger.info('HTML body was not found, attempting to generate from plain text body.') correctedBody = html.escape(self.body).replace('\r', '').replace('\n', '<br />') htmlBody = f'<html><body>{correctedBody}</body></head>'.encode('utf-8') else: logger.info('HTML body could not be found nor generated.') return htmlBody @functools.cached_property def rawAttachments(self) -> List[AttachmentBase]: """ A property to allow access to the non-signed attachments. """ return super().attachments @property def signedAttachmentClass(self) -> Type[_T]: """ The attachment class used for signed attachments. """ return self.__sAttCls @functools.cached_property def signedBody(self) -> Optional[str]: """ The body from the signed message if it exists. """ self.attachments return self._signedBody @functools.cached_property def signedHtmlBody(self) -> Optional[bytes]: """ The HTML body from the signed message if it exists. """ self.attachments return self._signedHtmlBody