Skip to content

Email compose

EmailDecompose

Bases: EmailParseBase

Source code in Docs2KG/parser/email/email_compose.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class EmailDecompose(EmailParseBase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.image_output_dir = self.output_dir / "images"
        self.image_output_dir.mkdir(parents=True, exist_ok=True)
        self.attachments_output_dir = self.output_dir / "attachments"
        self.attachments_output_dir.mkdir(parents=True, exist_ok=True)

    def decompose_email(self):
        """
        Decompose the email file to images, attachments, and metadata
        """
        msg = self.read_email_file()
        self.download_email_attachment(msg)
        return msg

    def read_email_file(self):
        with open(self.email_file, "rb") as f:
            msg = email.message_from_bytes(f.read())
        return msg

    def download_email_attachment(self, msg):
        """
        Download the email attachment and save it to the output directory
        Args:
            msg:

        Returns:

        """
        images = []
        attachments = []
        # extract all the attachments
        for part in msg.walk():
            if part.get_content_disposition() == "attachment":
                filename = part.get_filename()
                if filename:
                    filename = self.clean_filename(filename, part)

                    filepath = self.attachments_output_dir / filename
                    with open(filepath, "wb") as f:
                        f.write(part.get_payload(decode=True))
                    attachments.append(
                        {
                            "name": filename,
                            "path": filepath,
                            "original_filename": part.get_filename(),
                        }
                    )
            # if content type is image/ , download the image
            if part.get_content_type().startswith("image/"):
                img_data = part.get_payload(decode=True)
                img_name = part.get_filename()
                if img_name:
                    img_name = self.clean_filename(img_name, part)
                    img_path = self.image_output_dir / img_name
                    with open(img_path, "wb") as f:
                        f.write(img_data)
                    logger.info(f"Saved image to: {img_path}")
                    images.append(
                        {
                            "name": img_name,
                            "path": img_path,
                            "cid": part.get("Content-ID", ""),
                        }
                    )
            # save content to html or text, end with .html or .txt
            if part.get_content_type() == "text/html":
                html_content = part.get_payload(decode=True)
                html_output = self.output_dir / "email.html"
                with open(html_output, "wb") as f:
                    f.write(html_content)
                logger.info(f"Saved html to: {html_output}")
            if part.get_content_type() == "text/plain":
                text_content = part.get_payload(decode=True)
                text_output = self.output_dir / "email.txt"
                with open(text_output, "wb") as f:
                    f.write(text_content)
                logger.info(f"Saved text to: {text_output}")

            # save df to csv, end with .csv

        # metadata to json, include subject, from, to, date
        email_metadata = {
            "subject": msg["subject"],
            "from": msg["from"],
            "to": msg["to"],
            "date": msg["date"],
        }
        metadata_output = self.output_dir / "metadata.json"
        with open(metadata_output, "w") as f:
            json.dump(email_metadata, f)

        images_df = pd.DataFrame(images)
        images_output = self.image_output_dir / "images.csv"
        images_df.to_csv(images_output, index=False)

        attachments_df = pd.DataFrame(attachments)
        attachments_output = self.attachments_output_dir / "attachments.csv"
        attachments_df.to_csv(attachments_output, index=False)

        return msg

    @staticmethod
    def clean_filename(filename: str, part):
        """
        Clean the filename to remove special characters.

        Args:
            filename (str): Filename to clean.
            part (email.message.Message): Email part.

        Returns:
            str: Cleaned filename.
        """
        if "?=" in filename:
            filename = filename.rsplit("?=", 1)[0]
        if part.get("Content-ID"):
            filename = f"{part.get('Content-ID')}_{filename}"
        return filename

clean_filename(filename, part) staticmethod

Clean the filename to remove special characters.

Parameters:

Name Type Description Default
filename str

Filename to clean.

required
part Message

Email part.

required

Returns:

Name Type Description
str

Cleaned filename.

Source code in Docs2KG/parser/email/email_compose.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@staticmethod
def clean_filename(filename: str, part):
    """
    Clean the filename to remove special characters.

    Args:
        filename (str): Filename to clean.
        part (email.message.Message): Email part.

    Returns:
        str: Cleaned filename.
    """
    if "?=" in filename:
        filename = filename.rsplit("?=", 1)[0]
    if part.get("Content-ID"):
        filename = f"{part.get('Content-ID')}_{filename}"
    return filename

decompose_email()

Decompose the email file to images, attachments, and metadata

Source code in Docs2KG/parser/email/email_compose.py
20
21
22
23
24
25
26
def decompose_email(self):
    """
    Decompose the email file to images, attachments, and metadata
    """
    msg = self.read_email_file()
    self.download_email_attachment(msg)
    return msg

download_email_attachment(msg)

Download the email attachment and save it to the output directory Args: msg:

Returns:

Source code in Docs2KG/parser/email/email_compose.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def download_email_attachment(self, msg):
    """
    Download the email attachment and save it to the output directory
    Args:
        msg:

    Returns:

    """
    images = []
    attachments = []
    # extract all the attachments
    for part in msg.walk():
        if part.get_content_disposition() == "attachment":
            filename = part.get_filename()
            if filename:
                filename = self.clean_filename(filename, part)

                filepath = self.attachments_output_dir / filename
                with open(filepath, "wb") as f:
                    f.write(part.get_payload(decode=True))
                attachments.append(
                    {
                        "name": filename,
                        "path": filepath,
                        "original_filename": part.get_filename(),
                    }
                )
        # if content type is image/ , download the image
        if part.get_content_type().startswith("image/"):
            img_data = part.get_payload(decode=True)
            img_name = part.get_filename()
            if img_name:
                img_name = self.clean_filename(img_name, part)
                img_path = self.image_output_dir / img_name
                with open(img_path, "wb") as f:
                    f.write(img_data)
                logger.info(f"Saved image to: {img_path}")
                images.append(
                    {
                        "name": img_name,
                        "path": img_path,
                        "cid": part.get("Content-ID", ""),
                    }
                )
        # save content to html or text, end with .html or .txt
        if part.get_content_type() == "text/html":
            html_content = part.get_payload(decode=True)
            html_output = self.output_dir / "email.html"
            with open(html_output, "wb") as f:
                f.write(html_content)
            logger.info(f"Saved html to: {html_output}")
        if part.get_content_type() == "text/plain":
            text_content = part.get_payload(decode=True)
            text_output = self.output_dir / "email.txt"
            with open(text_output, "wb") as f:
                f.write(text_content)
            logger.info(f"Saved text to: {text_output}")

        # save df to csv, end with .csv

    # metadata to json, include subject, from, to, date
    email_metadata = {
        "subject": msg["subject"],
        "from": msg["from"],
        "to": msg["to"],
        "date": msg["date"],
    }
    metadata_output = self.output_dir / "metadata.json"
    with open(metadata_output, "w") as f:
        json.dump(email_metadata, f)

    images_df = pd.DataFrame(images)
    images_output = self.image_output_dir / "images.csv"
    images_df.to_csv(images_output, index=False)

    attachments_df = pd.DataFrame(attachments)
    attachments_output = self.attachments_output_dir / "attachments.csv"
    attachments_df.to_csv(attachments_output, index=False)

    return msg