from __future__ import annotations
__all__ = [
'DirectoryEntry',
'OleWriter',
]
import copy
import re
from typing import (
Dict, Iterator, List, Optional, SupportsBytes, Tuple, TYPE_CHECKING,
Union
)
from . import constants
from .constants import MSG_PATH
from .enums import Color, DirectoryEntryType
from .exceptions import TooManySectorsError
from .utils import ceilDiv, dictGetCasedKey, inputToMsgPath
from olefile.olefile import OleDirectoryEntry, OleFileIO
from red_black_dict_mod import RedBlackTree
# Allow for nice type checking.
if TYPE_CHECKING:
from .msg_classes import MSGFile
[docs]class DirectoryEntry:
"""
An internal representation of a stream or storage in the OleWriter.
Originals should be inaccessible outside of the class.
"""
name: str = ''
rightChild: Optional[DirectoryEntry] = None
leftChild: Optional[DirectoryEntry] = None
childTreeRoot: Optional[DirectoryEntry] = None
stateBits: int = 0
creationTime: int = 0
modifiedTime: int = 0
type: DirectoryEntryType = DirectoryEntryType.UNALLOCATED
# These get set after things have been sorted by the red black tree.
id: int = -1
# This is the ID for the left child. The terminology in the docs is really
# annoying.
leftSiblingID: int = 0xFFFFFFFF
rightSiblingID: int = 0xFFFFFFFF
# This is the ID for the root of the child tree, if any.
childID: int = 0xFFFFFFFF
startingSectorLocation: int = 0
color: Color = Color.BLACK
clsid: bytes = b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
data: bytes = b''
def __bytes__(self) -> bytes:
return self.toBytes()
[docs] def toBytes(self) -> bytes:
"""
Converts the entry to bytes to be writen to a file.
"""
# First write the name and the name length.
if len(self.name) > 31:
raise ValueError('Name is too long for directory entry.')
if len(self.name) < 1:
raise ValueError('Directory entry must have a name.')
if re.search('/\\\\:!', self.name):
raise ValueError('Directory entry name contains an illegal character.')
nameBytes = self.name.encode('utf-16-le')
return constants.st.ST_CF_DIR_ENTRY.pack(
nameBytes,
len(nameBytes) + 2,
self.type,
self.color,
self.leftSiblingID,
self.rightSiblingID,
self.childID,
self.clsid,
self.stateBits,
self.creationTime,
self.modifiedTime,
self.startingSectorLocation,
getattr(self, 'streamSize', len(self.data)),
)
[docs]class OleWriter:
"""
Takes data to write to a compound binary format file, as specified in
[MS-CFB].
"""
def __init__(self, rootClsid: bytes = constants.DEFAULT_CLSID):
self.__rootEntry = DirectoryEntry()
self.__rootEntry.name = "Root Entry"
self.__rootEntry.type = DirectoryEntryType.ROOT_STORAGE
self.__rootEntry.clsid = rootClsid
# The root entry will always exist, so this must be at least 1.
self.__dirEntryCount = 1
self.__dirEntries = {}
self.__largeEntries: List[DirectoryEntry] = []
self.__largeEntrySectors = 0
self.__numMinifatSectors = 0
# In a future version, this will be setable as an optional argument.
self.__version = 3
def __getContainingStorage(self, path: List[str], entryExists: bool = True, create: bool = False) -> Dict:
"""
Finds the storage ``dict`` internally where the entry specified by
:param path: would be created.
:param entryExists: If ``True``, throws an error when the requested
entry does not yet exist.
:param create: If ``True``, creates missing storages with default
settings.
:raises OSError: If :param create: is ``False`` and the path could not
be found. Also raised if :param entryExists: is ``True`` and the
requested entry does not exist.
:raises ValueError: Tried to access an interal stream or tried to use
both the create option and the entryExists option is ``True``.
:returns: The storage ``dict`` that the entry is in.
"""
if not path:
raise OSError('Path cannot be empty.')
# Quick check for incompatability between create and entryExists.
if create and entryExists:
raise ValueError(':param create: and :param entryExists: cannot both be True (an entry cannot exist if it is being created).')
# Check that the path is not an internal entry. Given the validation on
# paths that most functions should do because of the call to
# inputToMsgPath, this shouldn't actually be necessary.
if any(x.startswith('::') for x in path):
raise ValueError('Found internal name in path.')
_dir = self.__dirEntries
for index, name in enumerate(path[:-1]):
# If no entry in the current stream matches the path, raise an
# OSError, *unless* the option to create storages is True.
if name.lower() not in map(str.lower, _dir.keys()):
if create:
self.addEntry(path[:index + 1], storage = True)
else:
raise OSError(f'Entry not found: {name}')
_dir = _dir[dictGetCasedKey(_dir, name)]
# If the current item is not a storage and we have more to the path,
# raise an OSError.
if not isinstance(_dir, dict):
raise OSError('Attempted to access children of a stream.')
if entryExists and path[-1].lower() not in map(str.lower, _dir.keys()):
raise OSError(f'Entry not found: {path[-1]}')
return _dir
def __getEntry(self, path: List[str]) -> DirectoryEntry:
"""
Finds and returns an existing ``DirectoryEntry`` instance in the writer.
:raises OSError: If the entry does not exist.
:raises ValueError: If access to an internal item is attempted.
"""
_dir = self.__getContainingStorage(path)
item = _dir[dictGetCasedKey(_dir, path[-1])]
if isinstance(item, dict):
return item['::DirectoryEntry']
else:
return item
def __modifyEntry(self, entry: DirectoryEntry, **kwargs):
"""
Edits the DirectoryEntry with the data provided.
Common code used for :meth:`addEntry` and :meth:`editEntry`.
:raises TypeError: Attempted to modify the data of a storage.
:raises ValueError: Some part of the data given to modify the various
properties was invalid. See the the listed methods for details.
"""
# Extract the arguments.
data = kwargs.get('data')
clsid = kwargs.get('clsid')
creationTime = kwargs.get('creationTime')
modifiedTime = kwargs.get('modifiedTime')
stateBits = kwargs.get('stateBits')
# I don't like that I have repeated if statements for checking each of
# the arguments, but I need to make sure nothing changes if something is
# invalid.
if data is not None:
if entry.type is not DirectoryEntryType.STREAM:
raise TypeError('Cannot set the data of a storage object.')
if not isinstance(data, bytes):
try:
data = bytes(data)
except Exception:
raise ValueError('Data must be a bytes instance or convertable to bytes if set.')
# Check the length of data. In future versions, this may be a
# different check which is done when swapping between version 3 and
# 4 of the compound file binary file format.
if len(data) > 0x80000000:
raise ValueError('Current version of extract_msg does not support streams greater than 2 GB in OLE files.')
if clsid is not None:
if not isinstance(clsid, bytes):
raise ValueError('CLSID must be bytes.')
if len(clsid) != 16:
raise ValueError('CLSID must be 16 bytes.')
if creationTime is not None:
if entry.type is DirectoryEntryType.STREAM:
raise ValueError('Modification of creation time cannot be done on a stream.')
if not isinstance(creationTime, int) or creationTime < 0 or creationTime > 0xFFFFFFFFFFFFFFFF:
raise ValueError('Creation time must be a positive 8 byte int.')
if modifiedTime is not None:
if entry.type is DirectoryEntryType.STREAM:
raise ValueError('Modification of modified time cannot be done on a stream.')
if not isinstance(modifiedTime, int) or modifiedTime < 0 or modifiedTime > 0xFFFFFFFFFFFFFFFF:
raise ValueError('Modified time must be a positive 8 byte int.')
if stateBits is not None:
if not isinstance(stateBits, int) or stateBits < 0 or stateBits > 0xFFFFFFFF:
raise ValueError('State bits must be a positive 4 byte int.')
# Now that all our checks have passed, let's set our data.
if data is not None:
entry.data = data
if clsid is not None:
entry.clsid = clsid
if creationTime is not None:
entry.creationTime = creationTime
if modifiedTime is not None:
entry.modifiedTime = modifiedTime
if stateBits is not None:
entry.stateBits = stateBits
def __recalculateSectors(self) -> None:
"""
Recalculates several of the internal variables used for saving that
specify the number of sectors and where things should go.
"""
self.__dirEntryCount = 0
self.__numMinifatSectors = 0
self.__largeEntries.clear()
self.__largeEntrySectors = 0
for entry in self.__walkEntries():
self.__dirEntryCount += 1
if entry.type == DirectoryEntryType.STREAM:
if len(entry.data) < 4096:
self.__numMinifatSectors += ceilDiv(len(entry.data), 64)
else:
self.__largeEntries.append(entry)
self.__largeEntrySectors += ceilDiv(len(entry.data), self.__sectorSize)
def __walkEntries(self) -> Iterator[DirectoryEntry]:
"""
Returns a generator that will walk the entires recursively.
Each item returned by it will be a DirectoryEntry instance.
"""
toProcess = [self.__dirEntries]
yield self.__rootEntry
while len(toProcess) > 0:
for name, item in toProcess.pop(0).items():
if not name.startswith('::'):
if isinstance(item, dict):
yield item['::DirectoryEntry']
toProcess.append(item)
else:
yield item
@property
def __dirEntsPerSector(self) -> int:
"""
The number of Directory Entries that can fit in a sector.
"""
return self.__sectorSize // 128
@property
def __linksPerSector(self) -> int:
"""
The number of links per FAT/DIFAT sector.
"""
return self.__sectorSize // 4
@property
def __miniSectorsPerSector(self) -> int:
"""
The number of mini sectors that a regular sector will hold.
"""
return self.__sectorSize // 64
@property
def __numberOfSectors(self) -> int:
# Most of this should be pretty self evident, but line by line the
# calculation is as such:
# 1. How many sectors are needed for the directory entries.
# 2. How many FAT sectors are needed for the MiniStream.
# 3. How many sectors are needed for the MiniFat (ceil divide #2 by 16).
# 4. The number of FAT sectors needed to store the larger data.
return ceilDiv(self.__dirEntryCount, 4) + \
self.__numMinifat + \
ceilDiv(self.__numMinifat, 16) + \
self.__largeEntrySectors
@property
def __numMinifat(self) -> int:
"""
The number of FAT sectors needed to store the mini stream.
"""
return ceilDiv(64 * self.__numMinifatSectors, self.__sectorSize)
@property
def __sectorSize(self) -> int:
"""
The size of each sector, in bytes.
"""
return 512 if self.__version == 3 else 4096
def _cleanupEntries(self) -> None:
"""
Cleans up the node connections by walking the tree and removing
references that were added during writing.
"""
self.__largeEntries.clear()
for entry in self.__walkEntries():
entry.id = -1
entry.leftChild = None
entry.rightChild = None
entry.childTreeRoot = None
entry.leftSiblingID = 0xFFFFFFFF
entry.rightSiblingID = 0xFFFFFFFF
entry.childID = 0xFFFFFFFF
def _getFatSectors(self) -> Tuple[int, int, int]:
"""
Returns a tuple containing the number of FAT sectors, the number of
DIFAT sectors, and the total number of sectors the saved file will have.
"""
# Right now we just use an annoying while loop to get the numbers.
numDifat = 0
# All divisions are ceiling divisions,.
numFat = ceilDiv(self.__numberOfSectors or 1, self.__linksPerSector - 1)
newNumFat = 1
while numFat != newNumFat:
numFat = newNumFat
numDifat = ceilDiv(max(numFat - 109, 0), self.__linksPerSector - 1)
newNumFat = ceilDiv(self.__numberOfSectors + numDifat, self.__linksPerSector - 1)
return (numFat, numDifat, self.__numberOfSectors + numDifat + numFat)
def _treeSort(self, startingSector: int) -> List[DirectoryEntry]:
"""
Uses red-black trees to sort the internal data in preparation for
writing the file, returning a list, in order, of the entries to write.
"""
# First, create the root entry.
root = copy.copy(self.__rootEntry)
# Add the location of the start of the mini stream.
root.startingSectorLocation = (startingSector + ceilDiv(self.__dirEntryCount, 4) + ceilDiv(self.__numMinifatSectors, self.__linksPerSector)) if self.__numMinifat > 0 else 0xFFFFFFFE
root.streamSize = self.__numMinifatSectors * 64
root.childTreeRoot = None
root.childID = 0xFFFFFFFF
entries = [root]
toProcess = [(root, self.__dirEntries)]
# Continue looping while there is more to process.
while toProcess:
entry, currentItem = toProcess.pop()
if not currentItem:
continue
# If the current item *only* has the directory's entry and no stream
# entries, we are actually done.
# Create a tree and add all the items to it. We add it with a key
# that is a tuple of the length (as shorter is *always* less than
# longer) and the uppercase name, and the value is the actual entry.
tree = RedBlackTree()
for name in currentItem:
if not name.startswith('::'):
val = currentItem[name]
# If we find a directory entry, then we need to add it to
# the processing list.
if isinstance(val, dict):
toProcess.append((val['::DirectoryEntry'], val))
val = val['::DirectoryEntry']
entries.append(val)
# Add the data to the tree.
tree.add((len(name), name.upper()), val)
# Now that everything is added, we need to take our root and add it
# as the child of the current entry.
entry.childTreeRoot = tree.value
# Now we need to go through each node and set it's data based on
# it's sort position.
for node in tree.in_order():
item = node.value
# Set the color immediately.
item.color = Color.BLACK if node.is_black else Color.RED
if node.left:
item.leftChild = node.left.value
else:
item.leftChild = None
if node.right:
item.rightChild = node.right.value
else:
item.rightChild = None
# Now that everything is connected, we loop over the entries list a few
# times and set the data values.
for _id, entry in enumerate(entries):
entry.id = _id
for entry in entries:
entry.leftSiblingID = entry.leftChild.id if entry.leftChild else 0xFFFFFFFF
entry.childID = entry.childTreeRoot.id if entry.childTreeRoot else 0xFFFFFFFF
entry.rightSiblingID = entry.rightChild.id if entry.rightChild else 0xFFFFFFFF
# Finally, let's figure out the sector IDs to be used for the mini data.
# We only need to do this for streams with a size less than 4096.
# Use this to track where the next thing goes in the mini FAT.
miniFATLocation = 0
for entry in entries:
if len(entry.data) == 0 and entry != entries[0]:
# If there is no data, just set the starting location to none.
entry.startingSectorLocation = 0xFFFFFFFE
elif entry.type == DirectoryEntryType.STREAM and len(entry.data) < 4096:
entry.startingSectorLocation = miniFATLocation
miniFATLocation += ceilDiv(len(entry.data), 64)
return entries
def _writeBeginning(self, f) -> int:
"""
Writes the beginning to the file :param f:.
This includes the header, DIFAT, and FAT blocks.
:returns: The current sector number after all the data is written.
:raises TooMuchDataError: The number of sectors required for the file is
too large.
"""
# Recalculate some things needed for saving.
self.__recalculateSectors()
# Since we are going to need these multiple times, get them now.
numFat, numDifat, totalSectors = self._getFatSectors()
# Check to make sure there isn't too much data to write.
if totalSectors > 0xFFFFFFFB:
raise TooManySectorsError('Data in OleWriter requires too many sectors to write to a version 3 file.')
# The ministream *cannot* be greater than 2 GB, so check that before
# writing anything. A minifat sector is 64 bytes, so the maximum amount
# of them is 0x2000000.
if self.__numMinifatSectors > 0x2000000:
raise TooManySectorsError('Data is OleWriter requires too many MiniFAT sectors.')
# Header signature.
f.write(b'\xD0\xCF\x11\xE0\xA1\xB1\x1A\xE1')
# Header CLSID.
f.write(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
# Minor version.
f.write(b'\x3E\x00')
# Major version. For now, we only support version 3, but support for
# version 4 is planned.
f.write(b'\x03\x00' if self.__version == 3 else b'\x04\x00')
# Byte order. Specifies that it is little endian.
f.write(b'\xFE\xFF')
# Sector shift.
f.write(b'\x09\x00' if self.__version == 3 else b'\x0C\x00')
# Mini sector shift.
f.write(b'\x06\x00')
# Reserved.
f.write(b'\x00\x00\x00\x00\x00\x00')
# Number of directory sectors. Version 3 says this *must* be 0.
f.write(constants.st.ST_LE_UI32.pack(0))
# Number of FAT sectors.
f.write(constants.st.ST_LE_UI32.pack(numFat))
# First directory sector location (Sector for the directory stream).
# We place that right after the DIFAT and FAT.
f.write(constants.st.ST_LE_UI32.pack(numFat + numDifat))
# Transation signature number.
f.write(b'\x00\x00\x00\x00')
# Mini stream cutoff size.
f.write(b'\x00\x10\x00\x00')
# First mini FAT sector location.
f.write(constants.st.ST_LE_UI32.pack((numFat + numDifat + ceilDiv(self.__dirEntryCount, 4)) if self.__numMinifat > 0 else 0xFFFFFFFE))
# Number of mini FAT sectors.
f.write(constants.st.ST_LE_UI32.pack(ceilDiv(self.__numMinifatSectors, self.__linksPerSector)))
# First DIFAT sector location. If there are none, set to 0xFFFFFFFE (End
# of chain).
f.write(constants.st.ST_LE_UI32.pack(0 if numDifat else 0xFFFFFFFE))
# Number of DIFAT sectors.
f.write(constants.st.ST_LE_UI32.pack(numDifat))
# To make life easier on me, I'm having the code start with the DIFAT
# followed by the FAT sectors, as I can write them all at once before
# writing the actual contents of the file.
# Write the DIFAT sectors.
for x in range(numFat):
# Quickly check if we have hit 109. If we have, and we are writing
# a version 4 file, we need to pad a bunch of null bytes.
if x == 109 and self.__version == 4:
f.write(b'\x00' * 3584)
# This kind of sucks to code, ngl.
if x > 109 and (x - 109) % (self.__linksPerSector - 1) == 0:
# If we are at the end of a DIFAT sector, write the jump.
f.write(constants.st.ST_LE_UI32.pack((x - 109) // (self.__linksPerSector - 1)))
# Write the next FAT sector location.
f.write(constants.st.ST_LE_UI32.pack(x + numDifat))
# Finally, fill out the last DIFAT sector with null entries.
if numFat > 109:
f.write(b'\xFF\xFF\xFF\xFF' * ((self.__linksPerSector - 1) - ((numFat - 109) % (self.__linksPerSector - 1))))
# Finally, make sure to write the end of chain marker for the DIFAT.
f.write(b'\xFE\xFF\xFF\xFF')
else:
f.write(b'\xFF\xFF\xFF\xFF' * (109 - numFat))
### FAT.
# First, if we had any DIFAT sectors, write that the previous sectors
# were all a part of it.
f.write(b'\xFC\xFF\xFF\xFF' * numDifat)
# Second write that the next x sectors are all FAT sectors.
f.write(b'\xFD\xFF\xFF\xFF' * numFat)
offset = numDifat + numFat
# Fill in the values for the directory stream.
for x in range(offset + 1, offset + ceilDiv(self.__dirEntryCount, self.__dirEntsPerSector)):
f.write(constants.st.ST_LE_UI32.pack(x))
# Write the end of chain marker.
f.write(b'\xFE\xFF\xFF\xFF')
offset += ceilDiv(self.__dirEntryCount, self.__dirEntsPerSector)
# Check if we have minifat *at all* first.
if self.__numMinifatSectors > 0:
# Mini FAT chain.
for x in range(offset + 1, offset + ceilDiv(self.__numMinifat, 16)):
f.write(constants.st.ST_LE_UI32.pack(x))
# Write the end of chain marker.
f.write(b'\xFE\xFF\xFF\xFF')
offset += ceilDiv(self.__numMinifat, 16)
# The mini stream sectors.
for x in range(offset + 1, offset + self.__numMinifat):
f.write(constants.st.ST_LE_UI32.pack(x))
# Write the end of chain marker.
f.write(b'\xFE\xFF\xFF\xFF')
offset += self.__numMinifat
# Regular stream chains. These are the most complex to handle. We handle
# them by checking a list that was make of entries which were only added
# to that list if the size was more than 4096. The order in the list is
# how they will eventually be stored into the file correctly.
for entry in self.__largeEntries:
size = ceilDiv(len(entry.data), self.__sectorSize)
entry.startingSectorLocation = offset
for x in range(offset + 1, offset + size):
f.write(constants.st.ST_LE_UI32.pack(x))
# Write the end of chain marker.
f.write(b'\xFE\xFF\xFF\xFF')
offset += size
# Finally, fill fat with markers to specify no block exists.
freeSectors = totalSectors & (self.__linksPerSector - 1)
if freeSectors:
f.write(b'\xFF\xFF\xFF\xFF' * (self.__linksPerSector - freeSectors))
# Finally, return the current sector index for use in other places.
return numDifat + numFat
def _writeDirectoryEntries(self, f, startingSector: int) -> List[DirectoryEntry]:
"""
Writes out all the directory entries. Returns the list generated.
"""
entries = self._treeSort(startingSector)
for x in entries:
self._writeDirectoryEntry(f, x)
if len(entries) & 3:
f.write(((b'\x00\x00' * 34) + (b'\xFF\xFF' * 6) + (b'\x00\x00' * 24)) * (4 - (len(entries) & 3)))
return entries
def _writeDirectoryEntry(self, f, entry: DirectoryEntry) -> None:
"""
Writes the directory entry to the file f.
"""
f.write(bytes(entry))
def _writeFinal(self, f) -> None:
"""
Writes the final sectors of the file, consisting of the streams too
large for the mini FAT.
"""
for x in self.__largeEntries:
f.write(x.data)
if len(x.data) & (self.__sectorSize - 1):
f.write(b'\x00' * (self.__sectorSize - (len(x.data) & (self.__sectorSize - 1))))
def _writeMini(self, f, entries: List[DirectoryEntry]) -> None:
"""
Writes the mini FAT followed by the full mini stream.
"""
# For each of the entires that are streams and less than 4096.
currentSector = 0
for x in entries:
if x.type == DirectoryEntryType.STREAM and len(x.data) < 4096:
size = ceilDiv(len(x.data), 64)
for x in range(currentSector + 1, currentSector + size):
f.write(constants.st.ST_LE_UI32.pack(x))
if size > 0:
f.write(b'\xFE\xFF\xFF\xFF')
currentSector += size
# Finally, write the remaining slots.
if currentSector & (self.__linksPerSector - 1):
f.write(b'\xFF\xFF\xFF\xFF' * (self.__linksPerSector - (currentSector & (self.__linksPerSector - 1))))
# Write the mini stream.
for x in entries:
if len(x.data) > 0 and len(x.data) < 4096:
f.write(x.data)
if len(x.data) & 63:
f.write(b'\x00' * (64 - (len(x.data) & 63)))
# Pad the final mini stream block.
if self.__numMinifatSectors & (self.__miniSectorsPerSector - 1):
f.write((b'\x00' * 64) * (self.__miniSectorsPerSector - (self.__numMinifatSectors & (self.__miniSectorsPerSector - 1))))
[docs] def addEntry(self, path: MSG_PATH, data: Optional[Union[bytes, SupportsBytes]] = None, storage: bool = False, **kwargs) -> None:
"""
Adds an entry to the OleWriter instance at the path specified, adding
storages with default settings where necessary. If the entry is not a
storage, :param data: *must* be set.
:param path: The path to add the entry at. Must not contain a path part
that is an already added stream.
:param data: The bytes for a stream or an object with a ``__bytes__``
method.
:param storage: If ``True``, the entry to add is a storage. Otherwise,
the entry is a stream.
:param clsid: The CLSID for the stream/storage. Must a a bytes instance
that is 16 bytes long.
:param creationTime: An 8 byte filetime int. Sets the creation time of
the entry. Not applicable to streams.
:param modifiedTime: An 8 byte filetime int. Sets the modification time
of the entry. Not applicable to streams.
:param stateBits: A 4 byte int. Sets the state bits, user-defined flags,
of the entry. For a stream, this *SHOULD* be unset.
:raises OSError: A stream was found on the path before the end or an
entry with the same name already exists.
:raises ValueError: Attempts to access an internal item.
:raises ValueError: The data provided is too large.
"""
path = inputToMsgPath(path)
# First, find the current place in our dict to add the item.
_dir = self.__getContainingStorage(path, False, True)
# Now, check that the item *is not* already in our dict, as that would
# cause problems.
if path[-1].lower() in map(str.lower, _dir.keys()):
raise OSError('Cannot add an entry that already exists.')
# Create a new entry with basic data and insert it.
entry = DirectoryEntry()
entry.type = DirectoryEntryType.STORAGE if storage else DirectoryEntryType.STREAM
entry.name = path[-1]
self.__modifyEntry(entry, data = data, **kwargs)
if storage:
_dir[path[-1]] = {'::DirectoryEntry': entry}
else:
_dir[path[-1]] = entry
[docs] def addOleEntry(self, path: MSG_PATH, entry: OleDirectoryEntry, data: Optional[Union[bytes, SupportsBytes]] = None) -> None:
"""
Uses the entry provided to add the data to the writer.
:raises OSError: Tried to add an entry to a path that has not yet
been added, tried to add as a child of a stream, or tried to add an
entry where one already exists under the same name.
:raises ValueError: The data provided is too large.
"""
path = inputToMsgPath(path)
# First, find the current place in our dict to add the item.
_dir = self.__getContainingStorage(path, False)
# Now, check that the item *is not* already in our dict, as that would
# cause problems.
if path[-1].lower() in map(str.lower, _dir.keys()):
raise OSError('Cannot add an entry that already exists.')
# Now that we are in the right place, add our data.
newEntry = DirectoryEntry()
if entry.entry_type == DirectoryEntryType.STORAGE:
# Handle a storage entry.
# First, setup the values for the storage.
newEntry.name = entry.name
newEntry.type = DirectoryEntryType.STORAGE
newEntry.clsid = _unClsid(entry.clsid)
newEntry.stateBits = entry.dwUserFlags
newEntry.creationTime = entry.createTime
newEntry.modifiedTime = entry.modifyTime
# Finally add the dict to our tree of items.
_dir[path[-1]] = {'::DirectoryEntry': newEntry}
else:
# Handle a stream entry.
# First, setup the values for the stream.
newEntry.name = entry.name
newEntry.type = DirectoryEntryType.STREAM
newEntry.clsid = _unClsid(entry.clsid)
newEntry.stateBits = entry.dwUserFlags
# Next, handle the data.
data = data or b''
newEntry.data = bytes(data)
if len(newEntry.data) > 0x80000000:
raise ValueError('Current version of extract_msg does not support streams greater than 2 GB in OLE files.')
# Finally add the entry to out dict of entries.
_dir[path[-1]] = newEntry
self.__dirEntryCount += 1
[docs] def deleteEntry(self, path) -> None:
"""
Deletes the entry specified by :param path:, including all children.
:raises OSError: If the entry does not exist or a part of the path that
is not the last was a stream.
:raises ValueError: Attempted to delete an internal data stream.
"""
path = inputToMsgPath(path)
# Get the containing storage for the entry.
_dir = self.__getContainingStorage(path)
# The garbage collector will take care of all the loose items, so just
# remove the entry. Also, once again we deal with the case insensitive
# nature of the path. Even though comparisons are case insensitive, the
# path does remember the case used.
del _dir[dictGetCasedKey(_dir, path[-1])]
[docs] def editEntry(self, path: MSG_PATH, **kwargs) -> None:
"""
Used to edit values of an entry by setting the specific kwargs. Set a
value to something other than None to set it.
:param data: The data of a stream. Will error if used for something
other than a stream. Must be bytes or convertable to bytes.
:param clsid: The CLSID for the stream/storage. Must a a bytes instance
that is 16 bytes long.
:param creationTime: An 8 byte filetime int. Sets the creation time of
the entry. Not applicable to streams.
:param modifiedTime: An 8 byte filetime int. Sets the modification time
of the entry. Not applicable to streams.
:param stateBits: A 4 byte int. Sets the state bits, user-defined flags,
of the entry. For a stream, this *SHOULD* be unset.
To convert a 32 character hexadecial CLSID into the bytes for this
function, the _unClsid function in the ole_writer submodule can be used.
:raises OSError: The entry does not exist in the file.
:raises TypeError: Attempted to modify the bytes of a storage.
:raises ValueError: The type of a parameter was wrong, or the data of a
parameter was invalid.
"""
# First, find our entry to edit.
entry = self.__getEntry(inputToMsgPath(path))
# Send it to be modified using the arguments given.
self.__modifyEntry(entry, **kwargs)
[docs] def fromMsg(self, msg: MSGFile) -> None:
"""
Copies the streams and stream information necessary from the MSG file.
"""
# Get the root OLE entry's CLSID.
self.__rootEntry.clsid = _unClsid(msg._getOleEntry('/').clsid)
# List both storages and directories, but sort them by shortest length
# first to prevent errors.
entries = msg.listDir(True, True, False)
entries.sort(key = len)
for x in entries:
entry = msg._getOleEntry(x)
data = msg.getStream(x) if entry.entry_type == DirectoryEntryType.STREAM else None
# THe properties stream on embedded messages actualy needs to be
# transformed a little (*why* it is like that is a mystery to me).
# Basically we just need to add a "reserved" section to it in a
# specific place. So let's check if we are doing the properties
# stream and then if we are embedded.
if x[0] == '__properties_version1.0' and msg.prefixLen > 0:
data = data[:24] + b'\x00\x00\x00\x00\x00\x00\x00\x00' + data[24:]
self.addOleEntry(x, entry, data)
# Now check if it is an embedded file. If so, we need to copy the named
# properties streams (the metadata, not the values).
if msg.prefixLen > 0:
# Get the entry for the named properties directory and add it
# immediately if it exists. If it doesn't exist, this whole
# section will be skipped.
self.addOleEntry('__nameid_version1.0', msg._getOleEntry('__nameid_version1.0', False), None)
# Now that we know it exists, grab all the file inside and copy
# them to our root.
# Create our generator.
gen = (x for x in msg._oleListDir() if len(x) > 1 and x[0] == '__nameid_version1.0')
for x in gen:
self.addOleEntry(x, msg._getOleEntry(x, prefix = False), msg.getStream(x, prefix = False))
[docs] def fromOleFile(self, ole: OleFileIO, rootPath: MSG_PATH = []) -> None:
"""
Copies all the streams from the proided OLE file into this writer.
NOTE: This method does *not* handle any special rule that may be
required by a format that uses the compound binary file format as a base
when extracting an embedded directory. For example, MSG files require
modification of an embedded properties stream when extracting an
embedded MSG file.
:param rootPath: A path (accepted by ``olefile.OleFileIO``) to the
directory to use as the root of the file. If not provided, the file
root will be used.
:raises OSError: If :param rootPath: does not exist in the file.
"""
rootPath = inputToMsgPath(rootPath)
# Check if the root path is simply the top of the file.
if rootPath == []:
# Copy the clsid of the root entry.
self.__rootEntry.clsid = _unClsid(ole.direntries[0].clsid)
paths = {tuple(x): (x, ole.direntries[ole._find(x)]) for x in ole.listdir(True, True)}
else:
# If it is not the top of the file, we need to do some filtering.
# First get the CLSID from the entry the path points to.
try:
entry = ole.direntries[ole._find(rootPath)]
self.__rootEntry.clsid = _unClsid(entry.clsid)
except OSError as e:
if str(e) == 'file not found':
# Get the cause/context for the original exception and use
# it for the new exception. This hides the exception from
# OleFileIO.
context = e.__cause__ or e.__context__
raise OSError('Root path was not found in the OLE file.') from context
else:
raise
paths = {tuple(x[len(rootPath):]): (x, ole.direntries[ole._find(x)])
for x in ole.listdir(True, True) if len(x) > len(rootPath)}
# Copy all of the other entries. Ensure that directories come before
# their streams by sorting the paths.
for x in sorted(paths.keys()):
fullPath, entry = paths[x]
if entry.entry_type == DirectoryEntryType.STREAM:
with ole.openstream(fullPath) as f:
data = f.read()
else:
data = None
self.addOleEntry(x, entry, data)
[docs] def getEntry(self, path: MSG_PATH) -> DirectoryEntry:
"""
Finds and returns a copy of an existing `DirectoryEntry` instance in the
writer. Use this method to check the internal status of an entry.
:raises OSError: If the entry does not exist.
:raises ValueError: If access to an internal item is attempted.
"""
return copy.copy(self.__getEntry(inputToMsgPath(path)))
[docs] def listItems(self, streams: bool = True, storages: bool = False) -> List[List[str]]:
"""
Returns a list of the specified items currently in the writter.
:param streams: If ``True``, includes the path for each stream in the
list.
:param storages: If ``True``, includes the path for each storage in the
list.
"""
# We are actually abusing the walk function a bit here to life much
# easier. The way we do this is to look at the current directory that
# the walk function is giving information about and then deciding what
# parts of it we want to use. Once we have all the paths created, we
# will then sort and return it to give an output similar, if not
# identical, to OleFileIO.listdir. The mentioned method sorts keeping
# case in mind.
if not streams and not storages:
return []
paths = []
for currentDir, stor, stre in self.walk():
if storages:
for name in stor:
paths.append(currentDir + [name])
if streams:
for name in stre:
paths.append(currentDir + [name])
paths.sort()
return paths
[docs] def renameEntry(self, path: MSG_PATH, newName: str) -> None:
"""
Changes the name of an entry, leaving it in it's current position.
:raises OSError: If the entry does not exist or an entry with the new
name already exists,
:raises ValueError: If access to an internal item is attempted or the
new name provided is invalid.
"""
# First, validate the new name.
if not isinstance(newName, str):
raise ValueError('New name must be a string.')
if constants.re.INVALID_OLE_PATH.search(newName):
raise ValueError('Invalid character(s) in new name. Must not contain the following characters: \\//!:')
if len(newName) > 31:
raise ValueError('New name must be less than 32 characters.')
# Get the storage for our entry. Entry *must* exist.
_dir = self.__getContainingStorage(inputToMsgPath(path))
# See if an item in the storage already has that new name.
if newName.lower() in map(str.lower, _dir.keys()):
raise OSError('An entry with the new name already exists.')
# Get the original name.
originalName = dictGetCasedKey(_dir, path[-1])
# Get the entry to change.
entry = _dir[originalName]
if isinstance(entry, dict):
dirData = entry
entry = entry['::DirectoryEntry']
else:
dirData = None
# Change the name on the entry first.
entry.name = newName
# Now, we need to remove the item from the current storage and add it
# back with the new name.
del _dir[originalName]
if dirData is None:
_dir[newName] = entry
else:
_dir[newName] = dirData
[docs] def walk(self) -> Iterator[Tuple[List[str], List[str], List[str]]]:
"""
Functional equivelent to ``os.walk``, but for going over the file
structure of the OLE file to be written. Unlike ``os.walk``, it takes
no arguments.
:returns: A tuple of three lists. The first is the path, as a list of
strings, for the directory (or an empty list for the root), the
second is a list of the storages in the current directory, and the
last is a list of the streams. Streams and storages are sorted
caselessly.
"""
toProcess = [([], self.__dirEntries)]
# Go through the toProcess list, removing the last item every time to
# mimic the behavior of os.walk.
while toProcess:
currentDir, dirDict = toProcess.pop()
storages = []
streams = []
for name in sorted(dirDict.keys(), key = str.lower):
if not name.startswith('::'):
if isinstance(dirDict[name], dict):
storages.append(name)
toProcess.append((currentDir + [name], dirDict[name]))
else:
streams.append(name)
yield (currentDir, storages, streams)
[docs] def write(self, path) -> None:
"""
Writes the data to the path specified.
If :param path: has a ``write`` method, the object will be used
directly.
If a failure occurs, the file or IO device may have been modified.
:raises TooManySectorsError: The number of sectors requires for a part
of writing is too large.
"""
opened = False
# First, let's open the file if it is not a writable object.
if hasattr(path, 'write') and hasattr(path.write, '__call__'):
f = path
else:
f = open(path, 'wb')
opened = True
# Make sure we close the file after everything, especially if there is
# an error.
try:
# Write each section, transferring data between functions where
# necessary.
offset = self._writeBeginning(f)
entries = self._writeDirectoryEntries(f, offset)
self._writeMini(f, entries)
self._writeFinal(f)
finally:
self._cleanupEntries()
if opened:
f.close()
def _unClsid(clsid: str) -> bytes:
"""
Converts the clsid from ``olefile.olefile._clsid`` back to bytes.
"""
if not clsid:
return b''
clsid = clsid.replace('-', '')
try:
return bytes((
int(clsid[6:8], 16),
int(clsid[4:6], 16),
int(clsid[2:4], 16),
int(clsid[0:2], 16),
int(clsid[10:12], 16),
int(clsid[8:10], 16),
int(clsid[14:16], 16),
int(clsid[12:14], 16),
int(clsid[16:18], 16),
int(clsid[18:20], 16),
int(clsid[20:22], 16),
int(clsid[22:24], 16),
int(clsid[24:26], 16),
int(clsid[26:28], 16),
int(clsid[28:30], 16),
int(clsid[30:32], 16),
))
except Exception:
raise