aerogramme/tests/instrumentation/send-to-imap.py

117 lines
2.7 KiB
Python
Raw Permalink Normal View History

2022-07-07 14:03:57 +00:00
from imaplib import IMAP4_SSL, IMAP4
2022-07-05 09:44:22 +00:00
from os import listdir
from os.path import isfile, join
import sys
2022-07-07 14:03:57 +00:00
# COMMAND USAGE
#
# start a test IMAP servers:
# docker-compose.up
2022-07-07 14:03:57 +00:00
# then call this script. eg:
# ./send-to-imap.py all ./emails/dxflrs/
2022-07-07 14:03:57 +00:00
2022-07-05 10:04:04 +00:00
def rebuild_body_res(b):
bb = b''
for e in b:
if type(e) is tuple:
bb += b'\r\n'.join([p for p in e])
else:
bb += e
f = bb[bb.find(b'('):]
return f
mode = sys.argv[1]
2022-07-07 14:03:57 +00:00
path = sys.argv[2]
base_test_mb = "kzUXL7HyS5OjLcU8"
2022-07-07 14:03:57 +00:00
parameters = {
"dovecot": {
"con": IMAP4_SSL,
"port": 993,
2022-07-07 14:03:57 +00:00
"user": "test",
"pw": "pass",
"ext": ".dovecot",
"mb": base_test_mb,
2022-07-07 14:03:57 +00:00
},
"maddy": {
"con": IMAP4_SSL,
"port": 994,
2022-07-07 14:03:57 +00:00
"user": "test@example.com",
"pw": "pass",
"ext": ".maddy",
"mb": base_test_mb,
2022-07-07 14:03:57 +00:00
},
"cyrus": {
"con": IMAP4,
"port": 143,
"user": "test",
"pw": "pass",
"ext": ".cyrus",
"mb": "INBOX."+base_test_mb,
2023-02-07 13:40:26 +00:00
},
2024-01-07 21:27:12 +00:00
"courier": {
"con": IMAP4,
"port": 144,
"user": "debian",
"pw": "debian",
"ext": ".courier",
"mb": base_test_mb,
},
2023-02-07 13:40:26 +00:00
"stalwart": {
"con": IMAP4_SSL,
"port": 1993,
"user": "test@example.com",
"pw": "pass",
"ext": ".stalwart.0.2.0",
"mb": base_test_mb,
}
2022-07-07 14:03:57 +00:00
}
queue = list(parameters.keys())
if mode in parameters:
queue = [ mode ]
2022-07-07 14:03:57 +00:00
2022-07-05 09:44:22 +00:00
onlyfiles = [join(path, f) for f in listdir(path) if isfile(join(path, f)) and len(f) > 4 and f[-4:] == ".eml"]
for target in queue:
print(f"--- {target} ---")
conf = parameters[target]
test_mb = conf['mb']
with conf['con'](host="localhost", port=conf['port']) as M:
print(M.login(conf['user'], conf['pw']))
print(M.delete(test_mb))
print(M.create(test_mb))
2022-07-07 14:03:57 +00:00
print(M.list())
print(M.select(test_mb))
failed = 0
for (idx, f) in enumerate(onlyfiles):
f_noext = f[:-4]
try:
with open(f, 'r+b') as mail:
print(M.append(test_mb, [], None, mail.read()))
seq = (f"{idx+1-failed}:{idx+1-failed}").encode()
(r, b) = M.fetch(seq, "(BODY)")
print((r, b))
assert r == 'OK'
2022-07-05 10:04:04 +00:00
with open(f_noext + conf['ext'] + ".body", 'w+b') as w:
w.write(rebuild_body_res(b))
2022-07-05 09:44:22 +00:00
(r, b) = M.fetch(seq, "(BODYSTRUCTURE)")
print((r, b))
assert r == 'OK'
with open(f_noext + conf['ext'] + ".bodystructure", 'w+b') as w:
w.write(rebuild_body_res(b))
except:
failed += 1
print(f"failed {f}")
2022-07-05 09:44:22 +00:00
M.close()
M.logout()