2022-07-07 14:03:57 +00:00
|
|
|
from imaplib import IMAP4_SSL, IMAP4
|
2022-07-05 09:44:22 +00:00
|
|
|
from os import listdir
|
|
|
|
from os.path import isfile, join
|
|
|
|
import sys
|
|
|
|
|
2022-07-07 14:03:57 +00:00
|
|
|
# COMMAND USAGE
|
|
|
|
#
|
2022-07-08 09:47:59 +00:00
|
|
|
# start a test IMAP servers:
|
|
|
|
# docker-compose.up
|
2022-07-07 14:03:57 +00:00
|
|
|
# then call this script. eg:
|
2022-07-08 09:47:59 +00:00
|
|
|
# ./send-to-imap.py all ./emails/dxflrs/
|
2022-07-07 14:03:57 +00:00
|
|
|
|
|
|
|
|
2022-07-05 10:04:04 +00:00
|
|
|
def rebuild_body_res(b):
|
|
|
|
bb = b''
|
|
|
|
for e in b:
|
|
|
|
if type(e) is tuple:
|
|
|
|
bb += b'\r\n'.join([p for p in e])
|
|
|
|
else:
|
|
|
|
bb += e
|
|
|
|
|
|
|
|
f = bb[bb.find(b'('):]
|
|
|
|
return f
|
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
mode = sys.argv[1]
|
2022-07-07 14:03:57 +00:00
|
|
|
path = sys.argv[2]
|
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
base_test_mb = "kzUXL7HyS5OjLcU8"
|
2022-07-07 14:03:57 +00:00
|
|
|
parameters = {
|
|
|
|
"dovecot": {
|
|
|
|
"con": IMAP4_SSL,
|
2022-07-08 09:47:59 +00:00
|
|
|
"port": 993,
|
2022-07-07 14:03:57 +00:00
|
|
|
"user": "test",
|
|
|
|
"pw": "pass",
|
2022-07-07 16:19:37 +00:00
|
|
|
"ext": ".dovecot",
|
2022-07-08 09:47:59 +00:00
|
|
|
"mb": base_test_mb,
|
2022-07-07 14:03:57 +00:00
|
|
|
},
|
|
|
|
"maddy": {
|
|
|
|
"con": IMAP4_SSL,
|
2022-07-08 09:47:59 +00:00
|
|
|
"port": 994,
|
2022-07-07 14:03:57 +00:00
|
|
|
"user": "test@example.com",
|
|
|
|
"pw": "pass",
|
|
|
|
"ext": ".maddy",
|
2022-07-08 09:47:59 +00:00
|
|
|
"mb": base_test_mb,
|
2022-07-07 14:03:57 +00:00
|
|
|
},
|
2022-07-07 16:19:37 +00:00
|
|
|
"cyrus": {
|
|
|
|
"con": IMAP4,
|
2022-07-08 09:47:59 +00:00
|
|
|
"port": 143,
|
2022-07-07 16:19:37 +00:00
|
|
|
"user": "test",
|
|
|
|
"pw": "pass",
|
|
|
|
"ext": ".cyrus",
|
2022-07-08 09:47:59 +00:00
|
|
|
"mb": "INBOX."+base_test_mb,
|
2022-07-07 16:19:37 +00:00
|
|
|
}
|
2022-07-07 14:03:57 +00:00
|
|
|
}
|
2022-07-08 09:47:59 +00:00
|
|
|
|
|
|
|
queue = list(parameters.keys())
|
|
|
|
if mode in parameters:
|
|
|
|
queue = [ mode ]
|
2022-07-07 14:03:57 +00:00
|
|
|
|
2022-07-05 09:44:22 +00:00
|
|
|
onlyfiles = [join(path, f) for f in listdir(path) if isfile(join(path, f)) and len(f) > 4 and f[-4:] == ".eml"]
|
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
for target in queue:
|
|
|
|
print(f"--- {target} ---")
|
|
|
|
conf = parameters[target]
|
|
|
|
test_mb = conf['mb']
|
|
|
|
|
|
|
|
with conf['con'](host="localhost", port=conf['port']) as M:
|
|
|
|
print(M.login(conf['user'], conf['pw']))
|
|
|
|
print(M.delete(test_mb))
|
|
|
|
print(M.create(test_mb))
|
2022-07-07 14:03:57 +00:00
|
|
|
|
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
print(M.list())
|
|
|
|
print(M.select(test_mb))
|
|
|
|
failed = 0
|
|
|
|
for (idx, f) in enumerate(onlyfiles):
|
|
|
|
f_noext = f[:-4]
|
|
|
|
try:
|
|
|
|
with open(f, 'r+b') as mail:
|
|
|
|
print(M.append(test_mb, [], None, mail.read()))
|
|
|
|
seq = (f"{idx+1-failed}:{idx+1-failed}").encode()
|
|
|
|
(r, b) = M.fetch(seq, "(BODY)")
|
|
|
|
print((r, b))
|
|
|
|
assert r == 'OK'
|
2022-07-05 10:04:04 +00:00
|
|
|
|
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
with open(f_noext + conf['ext'] + ".body", 'w+b') as w:
|
|
|
|
w.write(rebuild_body_res(b))
|
2022-07-05 09:44:22 +00:00
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
(r, b) = M.fetch(seq, "(BODYSTRUCTURE)")
|
|
|
|
print((r, b))
|
|
|
|
assert r == 'OK'
|
|
|
|
with open(f_noext + conf['ext'] + ".bodystructure", 'w+b') as w:
|
|
|
|
w.write(rebuild_body_res(b))
|
|
|
|
except:
|
|
|
|
failed += 1
|
|
|
|
print(f"failed {f}")
|
2022-07-05 09:44:22 +00:00
|
|
|
|
2022-07-08 09:47:59 +00:00
|
|
|
M.close()
|
|
|
|
M.logout()
|