Skip to content

Email connector

EmailConnector

Login to the email server to download emails with keywords, or download specific number of latest emails

Source code in Docs2KG/parser/email/utils/email_connector.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class EmailConnector:
    """
    Login to the email server to download emails with keywords, or download specific number of latest emails
    """

    def __init__(
        self,
        email_address,
        password,
        output_dir=None,
        search_keyword: str = None,
        num_emails: int = 50,
        imap_server: str = "imap.gmail.com",
        imap_port: int = 993,
    ):
        """
        Initialize the EmailConnector with login credentials and search parameters.

        Args:
            email_address (str): Email address to log in.
            password (str): Password for the email address.
            search_keyword (str, optional): Keyword to search emails. Defaults to None.
            num_emails (int, optional): Number of latest emails to download. Defaults to 50.
        """
        self.email_address = email_address
        self.password = password
        self.search_keyword = search_keyword
        self.num_emails = num_emails
        self.imap_server = imap_server
        self.imap_port = imap_port
        self.imap = None
        self.login_imap()
        self.output_dir = output_dir
        if output_dir is None:
            self.output_dir = DATA_INPUT_DIR / "email" / email_address
        self.output_dir.mkdir(parents=True, exist_ok=True)

    def login_imap(self):
        """
        Login to the IMAP server.

        """
        self.imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
        self.imap.login(self.email_address, self.password)
        # log whether login was successful
        if self.imap.state == "AUTH":
            logger.info("Login successful")
        else:
            logger.error("Login failed")
            raise Exception("Login failed")

    def pull(self):
        """
        Pull the emails from the email server.
        """
        logger.info("Pulling emails")
        logger.info(self.search_keyword)
        if self.search_keyword:
            email_ids = self.search_emails()
        else:
            email_ids = self.download_latest_emails()

        for email_id in email_ids:
            logger.info(f"Downloading email: {email_id}")
            self.download_email(email_id)

    def search_emails(self):
        """
        Search for emails based on the search keyword.

        Returns:
            list: List of email IDs that match the search criteria.
        """
        self.imap.select("inbox")
        if self.search_keyword:
            result, data = self.imap.search(None, f'(BODY "{self.search_keyword}")')
            logger.info(f"Number of emails found: {len(data[0].split())}")
            logger.info(f"Email IDs: {data[0].split()}")
        else:
            result, data = self.imap.search(None, "ALL")
        email_ids = data[0].split()
        return email_ids

    def fetch_emails(self, email_ids):
        """
        Fetch the emails based on email IDs.

        Args:
            email_ids (list): List of email IDs to fetch.

        Returns:
            list: List of email messages.
        """
        emails = []
        for email_id in email_ids:
            result, data = self.imap.fetch(email_id, "(RFC822)")
            raw_email = data[0][1]
            msg = email.message_from_bytes(raw_email)
            emails.append(msg)
        return emails

    def download_latest_emails(self):
        """
        Download the latest emails up to the specified number.

        Returns:
            list: List of the latest email messages.
        """
        email_ids = self.search_emails()
        latest_email_ids = email_ids[-self.num_emails :]
        latest_emails = self.fetch_emails(latest_email_ids)
        return latest_emails

    def download_email(self, email_id):
        """
        Download a specific email based on the email ID.

        Args:
            email_id (str): Email ID to download.

        Returns:
            email.message.Message: Email message.
        """
        # fetch the email for the content and all the attachments
        result, data = self.imap.fetch(email_id, "(RFC822)")
        raw_email = data[0][1]

        # save the email to folder with email_id as filename
        email_output_dir = self.output_dir / email_id.decode("utf-8")
        email_output_dir.mkdir(parents=True, exist_ok=True)
        email_filepath = (
            email_output_dir / f"{self.email_address}.{email_id.decode('utf-8')}.eml"
        )
        with open(email_filepath, "wb") as f:
            f.write(raw_email)
        logger.info(f"Saved email to: {email_filepath}")

    def logout(self):
        """
        Logout from the email servers.
        """
        if self.imap:
            self.imap.logout()

__init__(email_address, password, output_dir=None, search_keyword=None, num_emails=50, imap_server='imap.gmail.com', imap_port=993)

Initialize the EmailConnector with login credentials and search parameters.

Parameters:

Name Type Description Default
email_address str

Email address to log in.

required
password str

Password for the email address.

required
search_keyword str

Keyword to search emails. Defaults to None.

None
num_emails int

Number of latest emails to download. Defaults to 50.

50
Source code in Docs2KG/parser/email/utils/email_connector.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    email_address,
    password,
    output_dir=None,
    search_keyword: str = None,
    num_emails: int = 50,
    imap_server: str = "imap.gmail.com",
    imap_port: int = 993,
):
    """
    Initialize the EmailConnector with login credentials and search parameters.

    Args:
        email_address (str): Email address to log in.
        password (str): Password for the email address.
        search_keyword (str, optional): Keyword to search emails. Defaults to None.
        num_emails (int, optional): Number of latest emails to download. Defaults to 50.
    """
    self.email_address = email_address
    self.password = password
    self.search_keyword = search_keyword
    self.num_emails = num_emails
    self.imap_server = imap_server
    self.imap_port = imap_port
    self.imap = None
    self.login_imap()
    self.output_dir = output_dir
    if output_dir is None:
        self.output_dir = DATA_INPUT_DIR / "email" / email_address
    self.output_dir.mkdir(parents=True, exist_ok=True)

download_email(email_id)

Download a specific email based on the email ID.

Parameters:

Name Type Description Default
email_id str

Email ID to download.

required

Returns:

Type Description

email.message.Message: Email message.

Source code in Docs2KG/parser/email/utils/email_connector.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def download_email(self, email_id):
    """
    Download a specific email based on the email ID.

    Args:
        email_id (str): Email ID to download.

    Returns:
        email.message.Message: Email message.
    """
    # fetch the email for the content and all the attachments
    result, data = self.imap.fetch(email_id, "(RFC822)")
    raw_email = data[0][1]

    # save the email to folder with email_id as filename
    email_output_dir = self.output_dir / email_id.decode("utf-8")
    email_output_dir.mkdir(parents=True, exist_ok=True)
    email_filepath = (
        email_output_dir / f"{self.email_address}.{email_id.decode('utf-8')}.eml"
    )
    with open(email_filepath, "wb") as f:
        f.write(raw_email)
    logger.info(f"Saved email to: {email_filepath}")

download_latest_emails()

Download the latest emails up to the specified number.

Returns:

Name Type Description
list

List of the latest email messages.

Source code in Docs2KG/parser/email/utils/email_connector.py
111
112
113
114
115
116
117
118
119
120
121
def download_latest_emails(self):
    """
    Download the latest emails up to the specified number.

    Returns:
        list: List of the latest email messages.
    """
    email_ids = self.search_emails()
    latest_email_ids = email_ids[-self.num_emails :]
    latest_emails = self.fetch_emails(latest_email_ids)
    return latest_emails

fetch_emails(email_ids)

Fetch the emails based on email IDs.

Parameters:

Name Type Description Default
email_ids list

List of email IDs to fetch.

required

Returns:

Name Type Description
list

List of email messages.

Source code in Docs2KG/parser/email/utils/email_connector.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def fetch_emails(self, email_ids):
    """
    Fetch the emails based on email IDs.

    Args:
        email_ids (list): List of email IDs to fetch.

    Returns:
        list: List of email messages.
    """
    emails = []
    for email_id in email_ids:
        result, data = self.imap.fetch(email_id, "(RFC822)")
        raw_email = data[0][1]
        msg = email.message_from_bytes(raw_email)
        emails.append(msg)
    return emails

login_imap()

Login to the IMAP server.

Source code in Docs2KG/parser/email/utils/email_connector.py
47
48
49
50
51
52
53
54
55
56
57
58
59
def login_imap(self):
    """
    Login to the IMAP server.

    """
    self.imap = imaplib.IMAP4_SSL(self.imap_server, self.imap_port)
    self.imap.login(self.email_address, self.password)
    # log whether login was successful
    if self.imap.state == "AUTH":
        logger.info("Login successful")
    else:
        logger.error("Login failed")
        raise Exception("Login failed")

logout()

Logout from the email servers.

Source code in Docs2KG/parser/email/utils/email_connector.py
147
148
149
150
151
152
def logout(self):
    """
    Logout from the email servers.
    """
    if self.imap:
        self.imap.logout()

pull()

Pull the emails from the email server.

Source code in Docs2KG/parser/email/utils/email_connector.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def pull(self):
    """
    Pull the emails from the email server.
    """
    logger.info("Pulling emails")
    logger.info(self.search_keyword)
    if self.search_keyword:
        email_ids = self.search_emails()
    else:
        email_ids = self.download_latest_emails()

    for email_id in email_ids:
        logger.info(f"Downloading email: {email_id}")
        self.download_email(email_id)

search_emails()

Search for emails based on the search keyword.

Returns:

Name Type Description
list

List of email IDs that match the search criteria.

Source code in Docs2KG/parser/email/utils/email_connector.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def search_emails(self):
    """
    Search for emails based on the search keyword.

    Returns:
        list: List of email IDs that match the search criteria.
    """
    self.imap.select("inbox")
    if self.search_keyword:
        result, data = self.imap.search(None, f'(BODY "{self.search_keyword}")')
        logger.info(f"Number of emails found: {len(data[0].split())}")
        logger.info(f"Email IDs: {data[0].split()}")
    else:
        result, data = self.imap.search(None, "ALL")
    email_ids = data[0].split()
    return email_ids