Last active
February 20, 2023 15:54
-
-
Save reagle/1fe5e496ed7c772b68aa60c4d827abc9 to your computer and use it in GitHub Desktop.
Pretty print a mailbox, since some previous date, as a simple HTML file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
"""Pretty print a mailbox, since some previous date, as a simple HTML file""" | |
import argparse # http://docs.python.org/dev/library/argparse.html | |
# http://docs.python.org/lib/module-email.Utils.html | |
# from email.utils import parsedate | |
import email.parser | |
import html | |
import logging | |
import mailbox | |
import os | |
import re | |
import subprocess | |
import sys | |
import textwrap | |
import urllib | |
from datetime import datetime # https://docs.python.org/3/library/datetime | |
from pathlib import Path # https://docs.python.org/3/library/pathlib.html | |
from dateutil import relativedelta as rd | |
from dateutil.parser import parse | |
from dateutil.tz import tzlocal # https://dateutil.readthedocs.io/en/stable/ | |
import markup # https://tylerbakke.github.io/MarkupPy/ | |
HOME = str(Path("~").expanduser()) | |
debug = logging.debug | |
info = logging.info | |
warn = logging.warn | |
error = logging.error | |
critical = logging.critical | |
exception = logging.exception | |
# Email stuff ############################# | |
def msgfactory(mbox_fp): | |
try: | |
return email.message_from_file(mbox_fp) | |
except email.Errors.MessageParseError: | |
# Don't return None since that will | |
# stop the mailbox iterator | |
return "" | |
def get_headers(msg): | |
subject = sender = msg_date = None | |
subject = html.escape(msg.get("subject")) | |
sender = html.escape(msg.get("from").split("@")[0] + ">") | |
msg_date = parse(msg.get("date")) | |
return subject, sender, msg_date | |
# Date stuff ############################## | |
INT2DAY = { | |
i: day for i, day in enumerate((rd.MO, rd.TU, rd.WE, rd.TH, rd.FR, rd.SA, rd.SU)) | |
} | |
# DAY2INT = {v: k for k, v in INT2DAY.items()} | |
MO, TU, WE, TH, FR, SA, SU = INT2DAY.keys() | |
def get_previous_class(classes, today): | |
"""returns previous class by finding the scheduled class days | |
of the week before today, and selecting the max/latest. | |
>>> get_previous_class((TU, FR), MO) | |
FR | |
>>> get_previous_class((TU, FR), WE) | |
TU | |
>>> get_previous_class((TU, FR), FR) | |
TU | |
>>> get_previous_class((TU, FR), SA) | |
FR | |
""" | |
info(f" classes = '{str(classes)}' today = '{today}'") | |
# if today's day precedes this week's class days act as if | |
# I'm a week ahead -- this is simpler than moving classes back a week | |
if today <= sorted(classes)[0]: | |
today = SA | |
info(f" today adjusted ={today}") | |
previous_class = max(c for c in classes if c < today) | |
info(f" previous_class = {previous_class}") | |
return INT2DAY[previous_class] | |
def get_previous_class_date(): | |
"""returns previous class in time format""" | |
now = datetime.now(tzlocal()) # could use (pytz.timezone('US/Eastern')) | |
now_day = now.weekday() | |
info(f"{now_day=}") | |
prev_class = get_previous_class((TU, FR), now_day) | |
info(f"{prev_class=}") | |
prev_class_date = now + rd.relativedelta( # since 6PM day of last class | |
hour=18, minute=0, second=0, weekday=prev_class(-1) | |
) | |
info(f"{prev_class_date=}") | |
return prev_class_date | |
# Pandoc wrapper ########################## | |
def mkd2html(mkd): | |
mkd_utf8 = mkd.encode("utf-8", "replace").strip() | |
p = subprocess.Popen( | |
[ | |
"pandoc", | |
( | |
"--from=markdown+autolink_bare_uris" | |
"-blank_before_header-space_in_atx_header" | |
), | |
"--to=html", | |
], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
) | |
html_result = p.communicate(mkd_utf8)[0].decode("utf-8", "replace") | |
return html_result | |
def html2mkd(html_src): | |
html_utf8 = html_src.encode("utf-8", "replace").strip() | |
p = subprocess.Popen( | |
[ | |
"pandoc", | |
"--from=html-raw_html-native_divs-native_spans", | |
"--to=markdown_strict", | |
], | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
) | |
mkd = p.communicate(html_utf8)[0].decode("utf-8", "replace") | |
return mkd | |
# Textual tools ########################### | |
def unsafe_links(content): | |
"""remove annoying outlook link protection | |
>>> unsafe_links('<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Frp.liu233w.com%3A443%2Fhttps%2Fmoneyish.com%2Fish%2Fmillennials-are-killing-bar-soap%2F&data=02%7C01%7Cj.reagle%40northeastern.edu%7C2ed70276476f460557e908d615d1d419%7Ca8eec281aaa34daeac9b9a398b9215e7%7C0%7C0%7C636720388297012848&sdata=twrmHnYyqd0pf%2FH3wFIva4Vnwnmmxq1ajkOUtAUQtZ0%3D&reserved=0>') | |
'<https://moneyish.com/ish/millennials-are-killing-bar-soap/>' | |
""" # noqa: E501 | |
RE_SAFELINK = re.compile( | |
r""" | |
(https://na.*?\d\d\.safelinks.+\?url=) # MS link | |
(.+) # encoded URL | |
(&data[^"'>\)\s]+) # closing cruft | |
""", | |
re.VERBOSE, | |
) | |
info("content = '%s'" % content) | |
for match in RE_SAFELINK.finditer(content): | |
info("match = '%s'" % (match)) | |
safe_url = match.group(0) | |
info("safe_url = '%s'" % (safe_url)) | |
encoded_url = match.groups()[1] | |
info("encoded_url = '%s'" % (encoded_url)) | |
decoded_url = urllib.parse.unquote(encoded_url) | |
info("decoded_url = '%s'" % (decoded_url)) | |
content = content.replace(safe_url, decoded_url) | |
info("new_content = '%s'" % content) | |
return content | |
def rewrap_text(content): | |
new_content = "" | |
lines = content.split("\n") | |
for line in lines: | |
if len(line) > 80: | |
line = textwrap.fill(line) | |
new_content += "\n" + line | |
return "\n~~~\n" + new_content + "\n~~~\n" # in fenced code block? | |
def dedent_content(content): | |
new_content = re.sub(r"\n\s+", r"\n\n", content) | |
return new_content | |
def convert(filename, args): | |
info(f"{filename=}") | |
if filename.endswith("/"): | |
filename = filename[0:-1] | |
base_name = os.path.basename(filename) | |
base_name = base_name if not base_name.startswith(".") else base_name[1:] | |
info(f"{base_name=}") | |
html_fn = os.path.join(args.output_dir, base_name + "-responses.html") | |
info(f"{args.output_dir=}; {html_fn=}") | |
html_fp = open(html_fn, "w") | |
section_number = os.path.basename(filename) | |
# mbox = mailbox.Maildir(filename, factory=mailbox.MaildirMessage) | |
mbox = mailbox.mbox(filename, factory=mailbox.MaildirMessage) | |
prev_class_date = get_previous_class_date() | |
page = markup.page() | |
page.init( | |
title="Student Responses %s" % section_number, | |
css="https://reagle.org/joseph/2005/01/responses.css", | |
charset="utf-8", | |
) | |
page.h1(section_number) | |
relevant_msgs = [] | |
for msg in mbox: | |
subject, sender, msg_date = get_headers(msg) | |
if args.all or msg_date > prev_class_date: | |
info("sender = '%s'" % sender) | |
name = sender.rsplit(" ", 1)[0] # remove email address | |
if len(name.split(" ")) > 1: | |
last_name = name.rsplit(" ", 1)[1] | |
if "bin" in name.lower(): | |
last_name = "bin " + last_name | |
else: | |
last_name = name | |
info("last_name = '%s'" % last_name) | |
# added msg_date to sort on | |
relevant_msgs.append((msg_date, last_name, msg)) | |
if args.date_sort: | |
relevant_msgs = sorted(relevant_msgs) | |
else: # sort on name | |
relevant_msgs = sorted(relevant_msgs, key=lambda msg: msg[1].lower()) | |
for _, last_name, msg in relevant_msgs: | |
critical("\n") | |
critical("============================") | |
critical(f"{last_name=}") | |
subject, sender, msg_date = get_headers(msg) | |
sender_email = sender.rsplit(";")[1][0:-3] | |
page.div.open() | |
page.hr() | |
page.h1.open() | |
# page.a(e.p(sender, class_="sender"), href=f"#{sender_email}") | |
page.a("↪", class_="link", href=f"#{sender_email}") | |
page.span(sender, class_="sender", id=f"{sender_email}") | |
page.h1.close() | |
parts = {} | |
for part in msg.walk(): | |
debug("--------") | |
debug(f"{part=}") | |
msg_content_type = part.get_content_subtype() | |
processed_as_type = part.get_content_subtype() | |
charset = "WINDOWS-1252" | |
if part.get_content_charset(): | |
charset = part.get_content_charset() | |
debug(f"{charset=}") | |
if msg_content_type == "plain": | |
debug(f"part IS plain: {msg_content_type}") | |
processed_as_type += "+" + charset | |
content = part.get_payload(decode=True).decode(charset, "replace") | |
debug(f"{type(content)=}") | |
content = unsafe_links(content) | |
if content.startswith("<html>"): | |
content = html2mkd(content) | |
processed_as_type += "+html2mkd()" | |
if args.text: | |
content = rewrap_text(content) | |
processed_as_type += "+rewrap()" | |
# content = dedent_content(content) | |
debug(f"content = {content[0:250]}") | |
content = mkd2html(content) | |
# page.div(content, id=sender_email) | |
parts[msg_content_type] = content | |
# TODO 220119: continue and skip html if found markdown? | |
elif msg_content_type == "html": | |
debug("part is HTML: %s" % msg_content_type) | |
processed_as_type += "+html2mkd()" | |
content = part.get_payload(decode=True).decode(charset, "replace") | |
# convert to markdown to strip out junk | |
markdown = html2mkd(content) | |
debug(f"{markdown=}") | |
# then convert back to simple HTML | |
html_result = mkd2html(markdown) | |
debug(f"{html_result=}") | |
# page.div(html_result, id=sender_email) | |
# break # found HTML, which is okay, so move on | |
parts[msg_content_type] = content | |
else: | |
debug("part NOT plain: %s" % msg_content_type) | |
if msg_content_type == "msword" or msg_content_type == "octet-stream": | |
processed_as_type += "+doc" | |
debug(f"DOC {processed_as_type}") | |
command = "antiword" | |
elif ( | |
part.get_content_subtype() | |
== "vnd.openxmlformats-officedocument.wordprocessingml.document" | |
): | |
command = "docx2txt.sh" | |
processed_as_type += "+docx" | |
debug(f"DOCX {processed_as_type}") | |
else: | |
debug("don't know type, try next part") | |
continue # don't know what it is, try next part | |
tmpf = "/tmp/mail-part-msw" | |
tmpft = "/tmp/mail-part-msw.txt" | |
os.system(f"/bin/rm {tmpf} {tmpft}") | |
# charset = part.get_content_charset() | |
# if not charset: | |
# breakpoint() | |
content = part.get_payload(decode=True).decode(charset, "replace") | |
tmpmbox_fp = open(tmpf, "w") | |
tmpmbox_fp.write(content) # must be string not bytes | |
tmpmbox_fp.close() | |
os.system(f"{command} {tmpf} > {tmpft}") | |
tmpftpt = open(tmpft) | |
content = " ".join(tmpftpt.readlines()) | |
content = html.escape(content) | |
if args.text: | |
content = rewrap_text(content) | |
parts[msg_content_type] = content | |
# page.pre(content) | |
# page.p(('[processed as %s]' % processed_as_type)) | |
info(f"{parts.keys()=}") # first preference | |
if "plain" in parts: | |
page.div(parts["plain"]) | |
elif "html" in parts: # second preference | |
page.div(parts["html"]) | |
else: | |
page.pre(parts[0]) # first of whatever is there | |
page.div.close() | |
html_fp.write(str(page)) | |
html_fp.close() | |
def main(argv): | |
"""Process arguments""" | |
arg_parser = argparse.ArgumentParser(description="print HTML from mbox") | |
# positional arguments | |
arg_parser.add_argument("files", nargs="*", metavar="FILE") | |
# optional arguments | |
arg_parser.add_argument( | |
"-a", | |
"--all", | |
action="store_true", | |
default=False, | |
help="print all messages irrespective of date", | |
) | |
arg_parser.add_argument( | |
"-t", | |
"--text", | |
action="store_true", | |
default=False, | |
help="text (unformatted) rather than markdown", | |
) | |
arg_parser.add_argument( | |
"-d", | |
"--date-sort", | |
action="store_true", | |
default=False, | |
help="sort by date", | |
) | |
arg_parser.add_argument( | |
"-o", | |
"--output_dir", | |
metavar="DIRECTORY", | |
# default processed in main arg processing | |
help="output directory", | |
) | |
arg_parser.add_argument( | |
"-L", | |
"--log-to-file", | |
action="store_true", | |
default=False, | |
help="log to file %(prog)s.log", | |
) | |
arg_parser.add_argument( | |
"-V", | |
"--verbose", | |
action="count", | |
default=0, | |
help="Increase verbosity (specify multiple times for more)", | |
) | |
arg_parser.add_argument("--version", action="version", version="TBD") | |
args = arg_parser.parse_args(argv) | |
log_level = 100 # default | |
if args.verbose >= 3: | |
log_level = logging.DEBUG # 10 | |
elif args.verbose == 2: | |
log_level = logging.INFO # 20 | |
elif args.verbose == 1: | |
log_level = logging.ERROR # 40 | |
LOG_FORMAT = "%(levelno)s %(funcName).5s: %(message)s" | |
if args.log_to_file: | |
logging.basicConfig( | |
filename="mbx-pp.log", | |
filemode="w", | |
level=log_level, | |
format=LOG_FORMAT, | |
) | |
else: | |
logging.basicConfig(level=log_level, format=LOG_FORMAT) | |
return args | |
if "__main__" == __name__: | |
args = main(sys.argv[1:]) | |
if not args.files: | |
args.files = [ | |
f"{HOME}/data/tbird-reagle/Mail/Local/classes.sbd/neu-cda", | |
# f"{HOME}/data/tbird-reagle/Mail/Local/classes.sbd/neu-oc", | |
f"{HOME}/data/tbird-reagle/Mail/Local/classes.sbd/neu-pc", | |
] | |
if not args.output_dir: | |
args.output_dir = HOME + f"/joseph/{datetime.now().year}/" | |
for filename in args.files: | |
info(f"STARTING = {filename}") | |
convert(filename, args) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment