#!/usr/bin/python3 # Inkbunny Maildir Fetch 0.1.0 # Copyright 2025 JustLurking # This program is free software: you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation, either version 3 of the License, or (at your # option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License # for more details. # # You should have received a copy of the GNU General Public License along # with this program. If not, see . # About # This program will connect to Inkbunny using the session cookie in the # PHPSESSID environment variable then download the messages in the user's # inbox and sent items, storing them as emails in maildir format in the # current working directory. # # As the program treats the current working directory as a maildir mailbox # it is important to change to the correct directory before running it. # # Only new items will be downloaded and saved. # # The program does its best to convert the HTML of a message back to BBCode, # but this is an imperfect conversion and the original HTML is saved also # so that either maybe used. # # Similarly, it is not possible to convert timestamps such as `a moment ago` # or `1 hrs, 20 mins ago` to the correct dates and times with full accuracy # the program does its best in those cases but some messages may be saved # with dates different to those shown on the site. # Changelog # 2025-01-03 JustLurking: Initial Release. import argparse import bs4 import dataclasses import email import logging import mailbox import os import re import requests import time import urllib.parse # Variables used throughout the program. # Used for identifying the program. program_file = "ib-maildir-fetch" program_name = "Inkbunny Maildir Fetch" version = "0.1.0" bug_reports = "https://Inkbunny.net/JustLurking/" homepage = "https://inkbunny.net/submissionsviewall.php?mode=pool&pool_id=98445" # Used to download pages. base_url = "https://inkbunny.net/privatemessages.php" cookie = None cookies = requests.cookies.RequestsCookieJar() # Used for rate limiting. request_pause = 0.25 last_request = 0 # Used to hold site details. user = None downloaded = set() maildir = None inbox = None sent = None latest_sent = 0 latest_inbox = 0 early_quit = True # Used for logging. log = logging.getLogger(__name__) # Variables used when converting HTML to BBCode. # These replacements are all simple substitutions. simple_replacements = { "b": (("strong",), {}), "center": (("div",), {"class_": "align_center"}), "i": (("em",), {}), "left": (("div",), {"class_": "align_left"}), "right": (("div",), {"class_": "align_right"}), "s": (("span",), {"class_": "strikethrough"}), "t": (("span",), {"class_": "font_title"}), "u": (("span",), {"class_": "underline"}) } # Map titles back to tag names. offsite_mapping = { "deviantART": "da", "Fur Affinity": "fa", "SoFurry": "sf", "Weasyl": "w" } # Regular expressions. re_offsite_title = re.compile(" on (deviantART|Fur Affinity|SoFurry|Weasyl)$") re_usericon = re.compile("^https://inkbunny.net/usericons/") re_color = re.compile("^color: [^;]*;$") re_size = re.compile("/(small|medium|large|huge)/") re_pool_url = re.compile("^/poolview_process.php\\?pool_id=") re_bare = re.compile("\\[(b|center|i|left|right|s|t|u|q|smallpool|mediumpool|smallthumb|mediumthumb|largethumb|hugethumb|color|icon|iconname|name|da|fa|sf|w|url)(=[^]]*)?]|da!|fa!|sf!|w!") re_date = re.compile("^((\\d+) hrs?,? )?((\\d+) mins?,? )?((\\d+) secs? )?ago$") # Classes and functions. @dataclasses.dataclass class MessageMetadata: """ Class that holds the metadata identifying a specific Inkbunny message. """ msg_id: int sender: str receiver: str subject: str date: str in_reply_to: str def download_messages_in_thread(self): """ Fetch the thread for this message if it has not been downloaded yet and extracts all messages from it, saving them to the correct folder if they haven't been downloaded already. """ global user, downloaded if int(self.msg_id) in downloaded: log.debug("Ignoring thread for message %d, message already saved.", self.msg_id) return log.debug("Fetching thread for message %d.", self.msg_id) soup = download( "https://inkbunny.net/privatemessageview.php", private_message_id=str(self.msg_id) ) get_logged_in_user(soup) other_user = self.sender if self.sender != user else self.receiver for msg in read_thread(soup, other_user): msg_id = int(msg["Message-Id"]) if msg_id in downloaded: log.debug("Ignoring message %d in thread %d, message already saved.", msg_id, self.msg_id) continue if msg["To"] == user: log.debug("Saving message %d in thread %d to inbox.", msg_id, self.msg_id) inbox.add(msg) else: log.debug("Saving message %d in thread %d to sent.", msg_id, self.msg_id) sent.add(msg) downloaded.add(msg_id) def create_message(self): """ Create a new multi-part email.message.EmailMessage with the headers set from this metadata object. Does not add a body, that is up to the caller. """ msg = email.message.EmailMessage() msg["Message-Id"] = str(self.msg_id) msg["From"] = self.sender msg["To"] = self.receiver msg["Date"] = self.date msg["Subject"] = self.subject if self.in_reply_to is not None: msg["In-Reply-To"] = str(self.in_reply_to) msg.make_alternative() return msg def download(url, **kwargs): """ Utility wrapper for boiler plate around downloading and parsing pages. Also enforces the rate limit. """ global cookies, last_request, request_pause now = time.time() if now - last_request < request_pause: time.sleep(request_pause - now + last_request) resp = requests.get(url, cookies=cookies, params=kwargs) for (i, step) in enumerate(resp.history): log.debug("[%d] Downloaded: %s", i, step.url) log.debug("[Final] Downloaded: %s", resp.url) resp.raise_for_status() last_request = time.time() return bs4.BeautifulSoup(resp.content, features="lxml") def parse_date(text): """ Convert a human-readable time to something that can be stored in an email header. """ global re_date, last_request to_subtract = None if text == "a moment ago": to_subtract = 0 m = re_date.match(text) if m is not None: to_subtract = 0 if m.group(2) is not None: to_subtract += int(m.group(2)) * 60 * 60 if m.group(4) is not None: to_subtract += int(m.group(4)) * 60 if m.group(6) is not None: to_subtract += int(m.group(6)) if to_subtract is not None: result = time.strftime("%a, %d %b %Y %H:%M:%S +0000", time.gmtime(last_request - to_subtract)) return result return text def get_logged_in_user(tag): """ Sets the logged-in user name if it has not been set yet using the supplied HTML. """ global user if user is not None: return nav = tag.find(class_="userdetailsnavigation") if nav is None: log.critical("Unable to find user details on page.") exit(1) widget = nav.find(class_="widget_userNameSmall") if widget is None: log.critical("Unable to find user details on page.") exit(1) user = widget.get_text().strip() log.info("Logged in as %s.", user) def get_next_page(tag): """ Returns the URL of the first next-page link in the supplied HTML. """ next_page_link = tag.find("a", title="next page") if next_page_link is None: return None return next_page_link["href"] def read_box_page(tag, in_inbox): """ A generator which extracts rows from the index view of a box. If is_inbox is True operates over the inbox, otherwise it operates over the sent items. """ global user get_logged_in_user(tag) # Set the variables for the box we will be reading. min_columns = 5 user_column = 1 subject_column = 3 date_column = 4 if in_inbox: min_columns = 6 user_column = 2 subject_column = 4 date_column = 5 # Read the rows from this page of the index. for row in tag.find_all(id=re.compile("^m_\\d+$")): columns = [*row.find_all("td", recursive=False)] if len(columns) < min_columns: # Malformed row? Skip it. continue other_user = columns[user_column].get_text().strip() subject = columns[subject_column].get_text().strip() date = parse_date(columns[date_column].get_text().strip()) msg_id = int(row["id"][2:]) if in_inbox: yield MessageMetadata(msg_id, other_user, user, subject, date, None) else: yield MessageMetadata(msg_id, user, other_user, subject, date, None) def read_box(is_inbox): """ A generator which extracts rows from a box. If is_inbox is True operates over the inbox, otherwise it operates over the sent items. """ global latest_inbox, latest_sent, early_quit # Set the variables for the box we will be reading. latest = latest_inbox if is_inbox else latest_sent end_of_new_messages = False next_page = "https://inkbunny.net/privatemessages_process.php?mode=" if is_inbox: log.info("Reading Inbox.") next_page += "inbox" else: log.info("Reading Sent.") next_page += "sent" # Download and parse the next page of the index. while next_page is not None: next_page = urllib.parse.urljoin(base_url, next_page) soup = download(next_page) # Read the rows in the index and yield them to the caller. for row in read_box_page(soup, is_inbox): if row.msg_id <= latest: end_of_new_messages = True yield row # Exit the loop early if we've encountered a message on this page # older than one we've already downloaded. if early_quit and end_of_new_messages: log.info("End of new messages in this box.") break next_page = get_next_page(soup) def unparse_html_to_bbcode(tag): """ Given some HTML generated by Inkbunny's BBCode renderer attempt to return the BBCode which might have generated it. Since the transformation is non-injective it's impossible to reverse with 100% accuracy. White-space will likely not be preserved either. """ global simple_replacements, offsite_mapping, re_offsite_title, re_usericon, re_color, re_size, re_pool_url, re_bare # [code] for string in tag.find_all(string=re_bare): string.replace_with(re_bare.sub(lambda m: "[code]"+m.group(0)+"[/code]", string.string)) # [q] and [q=someone] for quote in tag.find_all(class_="bbcode_quote"): author = quote.find(class_="bbcode_quote_author") body = quote.find(class_="bbcode_quote_quote").extract() argument = "" if author is not None: argument = "=" + author.get_text().strip()[:-7] quote.insert_before(bs4.NavigableString("[q"+argument+"]")) quote.insert_before(body) children = (*body.contents,) if len(children) == 1: if isinstance(children[0], bs4.NavigableString): children[0].replace_with(children[0].string.strip()) elif len(children) > 1: if isinstance(children[0], bs4.NavigableString): children[0].replace_with(children[0].string.lstrip()) if isinstance(children[-1], bs4.NavigableString): children[-1].replace_with(children[-1].string.rstrip()) body.unwrap() quote.insert_after(bs4.NavigableString("[/q]")) quote.extract() # [smallpool], [mediumpool] for pool in tag.find_all(class_="widget_imageFromSubmission"): table = pool.find_parent("table") if table is None: continue table = table.find_parent("table") if table is None or table.parent is None: # There are three divs with widget_imageFromSubmissions as their # class per pool table, so it is possible we've already processed # this pool. # This also filters out thumbnails which get processed after. continue size = re_size.search(pool.find("img")["src"]).group(1) pool_id = table.find("a", href=re_pool_url)["href"][31:] table.replace_with(bs4.NavigableString("["+size+"pool]"+pool_id+"[/"+size+"pool]")) # [smallthumb], [mediumthumb], [largethumb], [hugethumb] for thumb in tag.find_all(class_="widget_imageFromSubmission"): table = thumb.find_parent("table") size = re_size.search(thumb.find("img")["src"]).group(1) submission_id = thumb.find("a")["href"][3:] table.replace_with(bs4.NavigableString("["+size+"thumb]"+submission_id+"[/"+size+"thumb]")) # [color] for color in tag.find_all("span", style=re_color): color.insert_before(bs4.NavigableString("[color="+color["style"][7:-1]+"]")) color.insert_after(bs4.NavigableString("[/color]")) color.unwrap() # Newlines for br in tag.find_all("br"): br.replace_with(bs4.NavigableString("\n")) # [icon], [iconname] for icon in tag.find_all("img", src=re_usericon): table = icon.find_parent("table") link = icon.find_parent("a") name = link["href"][21:] if len((*table.find_all("a"),)) > 1: table.replace_with("[iconname]"+name+"[/iconname]") else: table.replace_with("[icon]"+name+"[/icon]") # [name] for namelink in tag.find_all("span", class_="widget_userNameSmall"): name = namelink.find("a")["href"][1:] namelink.replace_with("[name]"+name+"[/name]") # [da], [fa], [sf], [w] for link in tag.find_all("a", title=re_offsite_title): # Each off-site link generates two sibling elements in the output, we # can replace either, but we should remove the one we don't replace. if link.find("img") is None: # Replace the textual link. m = re_offsite_title.search(link["title"]) site = offsite_mapping[m.group(1)] link.replace_with("["+site+"]"+link.get_text().strip()+"[/"+site+"]") else: # Remove the image link. link.extract() # [url] for a in tag.find_all("a"): a.insert_before(bs4.NavigableString("[url="+a["href"]+"]")) a.insert_after(bs4.NavigableString("[/url]")) a.unwrap() # simple replacements for (name, args) in simple_replacements.items(): for target in tag.find_all(*args[0], **args[1]): target.insert_before(bs4.NavigableString("["+name+"]")) target.insert_after(bs4.NavigableString("[/"+name+"]")) target.unwrap() # combine all text nodes and return the string representation of the child # nodes. tag.smooth() return "".join(str(n) for n in tag.contents) def read_thread(tag, other_user): """ A generator which extracts messages from thread view. """ global user, messages previous_subject = None previous_msg_id = None got_result = False for elem in tag.find_all(id=re.compile("^irt_message_\\d+$")): # Extract the next message's data from the page. msg_id = int(elem["id"][12:]) children = [e for e in elem.children if not isinstance(e, bs4.NavigableString)] if len(children) < 5: # Malformed continue date_field = children[2].find(True) body_span = children[2].find("span", style="word-wrap: break-word;") if date_field is None or body_span is None: # Malformed continue subject_div = children[2].find(style="margin-bottom: 5px;") left_link = children[0].find(class_="widget_userNameSmall") right_link = children[4].find(class_="widget_userNameSmall") date = parse_date(date_field.get_text().strip()) subject = subject_div.get_text().strip() if subject_div is not None else previous_subject in_reply_to = previous_msg_id sender = None if left_link is not None: sender = left_link.get_text().strip() elif right_link is not None: sender = right_link.get_text().strip() else: continue receiver = other_user if sender == user else user # Create, populate and yield an EmailMessage for this message. content = bs4.BeautifulSoup("<body/>", features="lxml") content.title.string = subject content.body.append(body_span) body_span.unwrap() metadata = MessageMetadata(msg_id, sender, receiver, subject, date, in_reply_to) msg = metadata.create_message() msg.add_alternative("<!DOCTYPE html>" + str(content), "html") msg.add_alternative(str(unparse_html_to_bbcode(content.body))) yield msg got_result = True # Update information on the previous message (the one we just yielded) # prior to extracting next message. if subject is not None: previous_subject = subject previous_msg_id = msg_id # This should only be reached if a page other than the one we were # expecting is returned, i.e. an error page or similar. # Since something has clearly gone wrong just inform the user and abort. if not got_result: log.critical("Failed to parse at least one message in thread. Got: %s", str(tag)) exit(1) # Main Program starts here. # Configure logging. log.addHandler(logging.StreamHandler()) log.setLevel(logging.INFO) # Handle arguments. arg_parser = argparse.ArgumentParser( prog = program_file, description = "".join(( "Downloads messages from the Inkbunny.net website and stores them", " as emails in maildir format." )), epilog = "".join(( "If no DIRECTORY is specified then the current directory is", " assumed to be the maildir, care should be taken to ensure that", " the program is run in the correct directory or it may read or", " write to files the user does not want it to.\n", "\n", "The SESSION may be passed using the PHPSESSID environment variable." " this may be useful if you wish to pass the same value to multiple" " programs, or if you wish to set the value in your shell's profile.\n" "\n", "Report bugs to: "+bug_reports+"\n", program_name + " home page: <"+homepage+"\n" )), formatter_class = argparse.RawDescriptionHelpFormatter ) arg_parser.add_argument( "-s", "--session", nargs = 1, help = "the session id to send with requests" ) arg_parser.add_argument( "-C", "--directory", nargs = 1, help = "change directory when program starts" ) arg_parser.add_argument( "-q", "--quiet", action = "count", default = 2, help = "decrease verbosity" ) arg_parser.add_argument( "-v", "--verbose", action = "count", default = 0, help = "increase verbosity" ) arg_parser.add_argument( "-f", "--full-scan", action = "store_true", help = "read the full message index even if it seems unnecessary" ) arg_parser.add_argument( "-V", "--version", action = "store_true" ) args = arg_parser.parse_args() if args.version: print( " ".join((program_name, version)), "Copyright (C) 2025 JustLurking<https://inkbunny.net/JustLurking>", "License GPLv3+: GNU GPL version 3 or later "+ "<https://gnu.org/licenses/gpl.html>", "", "This is free software: you are free to change and redistribute it.", "There is NO WARRANTY, to the extent permitted by law.", sep="\n" ) exit(0) log.setLevel(max(1, min(5, args.quiet-args.verbose))*10) if args.directory is not None: log.info("Changing to directory %d.") os.chdir(args.directory) if args.full_scan: early_quit = False if args.session is not None: cookie = args.session # Open or create Maildir. maildir = mailbox.Maildir(".") inbox = maildir.add_folder("inbox") sent = maildir.add_folder("sent") # Set the Session Cookie from the environment. if cookie is None: cookie = os.getenv("PHPSESSID") if cookie is None: log.error("Please use the -s argument or set the PHPSESSID environment variable to set the session id.") exit(1) cookies.set("PHPSESSID", cookie, domain="inkbunny.net", path="/") # Make a list of all already downloaded messages. for msg in inbox: msg_id = int(msg["Message-Id"]) downloaded.add(msg_id) latest_inbox = max(latest_inbox, msg_id) for msg in sent: msg_id = int(msg["Message-Id"]) downloaded.add(msg_id) latest_sent = max(latest_sent, msg_id) # Fetch message indices for inbox and sent and for each message found fetch the # appropriate thread view and download all new messages in that thread. for row in read_box(True): row.download_messages_in_thread() for row in read_box(False): row.download_messages_in_thread()