from flask import Flask, render_template, session, request, redirect, url_for, send_file from flask_mysqldb import MySQL from mastodon import Mastodon import requests import MySQLdb import bcrypt import json, hashlib, re import functions cfg = json.load(open("config.json")) app = Flask(__name__) app.secret_key = cfg['secret_key'] app.config['MYSQL_HOST'] = cfg['db_host'] app.config['MYSQL_DB'] = cfg['db_name'] app.config['MYSQL_USER'] = cfg['db_user'] app.config['MYSQL_PASSWORD'] = cfg['db_pass'] mysql = MySQL(app) scopes = ['write:statuses', 'write:accounts', 'read:accounts', 'read:notifications', 'read:statuses', 'push'] scopes_pleroma = ['read', 'write', 'push'] @app.before_request def login_check(): if request.path not in ['/', '/about', '/welcome', '/login', '/signup', '/do/login', '/do/signup', '/static/style.css'] and not request.path.startswith("/push"): # page requires authentication if 'user_id' not in session: return redirect(url_for('home')) @app.route("/") def home(): if 'user_id' in session: c = mysql.connection.cursor() c.execute("SELECT COUNT(*) FROM `bots` WHERE user_id = %s", (session['user_id'],)) bot_count = c.fetchone()[0] active_count = None bots = {} bot_users = {} next_posts = {} if bot_count > 0: c.execute("SELECT COUNT(*) FROM `bots` WHERE user_id = %s AND enabled = TRUE", (session['user_id'],)) active_count = c.fetchone()[0] dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor) dc.execute("SELECT `handle`, `enabled`, `last_post`, `post_frequency` FROM `bots` WHERE user_id = %s", (session['user_id'],)) bots = dc.fetchall() dc.close() for bot in bots: # multiple SELECTS is slow, maybe SELECT all at once and filter with python? c.execute("SELECT COUNT(*) FROM `bot_learned_accounts` WHERE bot_id = %s", (bot['handle'],)) bot_users[bot['handle']] = c.fetchone()[0] c.execute("SELECT post_frequency - TIMESTAMPDIFF(MINUTE, last_post, CURRENT_TIMESTAMP()) FROM bots WHERE TIMESTAMPDIFF(MINUTE, last_post, CURRENT_TIMESTAMP()) <= post_frequency AND enabled = TRUE AND handle = %s", (bot['handle'],)) next_post = c.fetchone() if next_post is not None: next_posts[bot['handle']] = next_post c.close() return render_template("home.html", bot_count = bot_count, active_count = active_count, bots = bots, bot_users = bot_users, next_posts = next_posts) else: return render_template("front_page.html") @app.route("/welcome") def welcome(): return render_template("welcome.html") @app.route("/about") def about(): return render_template("about.html") @app.route("/login") def show_login_page(): return render_template("login.html", signup = False, error = session.pop('error', None)) @app.route("/signup") def show_signup_page(): return render_template("login.html", signup = True, error = session.pop('error', None)) @app.route("/settings", methods=['GET', 'POST']) def settings(): if request.method == 'GET': dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor) dc.execute("SELECT * FROM `users` WHERE id = %s", (session['user_id'],)) user = dc.fetchone() dc.close() return render_template("settings.html", user = user, error = session.pop('error', None), success = session.pop('success', None)) else: # update settings c = mysql.connection.cursor() c.execute("SELECT COUNT(*) FROM users WHERE email = %s AND id != %s", (request.form['email'], session['user_id'])) if c.fetchone()[0] > 0: session['error'] = "Email address already in use." return redirect(url_for("settings"), 303) for setting in [request.form['fetch-error'], request.form['submit-error'], request.form['reply-error'], request.form['generation-error']]: if setting not in ['once', 'always', 'never']: session['error'] = 'Invalid option "{}".'.format(setting) return redirect(url_for('settings'), 303) if request.form['password'] != '': # user is updating their password if len(request.form['password']) < 8: session['error'] = "Password too short." return redirect(url_for("settings"), 303) pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest().replace(b"\0", b"\1") pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12)) c.execute("UPDATE users SET password = %s WHERE id = %s", (pw, session['user_id'])) # don't require email verification again if the new email address is the same as the old one c.execute("SELECT email_verified FROM users WHERE id = %s", (session['user_id'],)) if c.fetchone()[0]: c.execute("SELECT email FROM users WHERE id = %s", (session['user_id'],)) previous_email = c.fetchone()[0] email_verified = (previous_email == request.form['email']) else: email_verified = False try: c.execute("UPDATE users SET email = %s, email_verified = %s, `fetch` = %s, submit = %s, generation = %s, reply = %s WHERE id = %s", ( request.form['email'], email_verified, request.form['fetch-error'], request.form['submit-error'], request.form['generation-error'], request.form['reply-error'], session['user_id'] )) c.close() mysql.connection.commit() except: session['error'] = "Encountered an error while updating the database." return redirect(url_for('settings'), 303) session['success'] = True return redirect(url_for('settings'), 303) @app.route("/bot/edit/", methods = ['GET', 'POST']) def bot_edit(id): if request.method == "GET": dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor) dc.execute("SELECT * FROM bots WHERE handle = %s", (id,)) return render_template("bot_edit.html", bot = dc.fetchone(), error = session.pop('error', None), success = session.pop('success', None)) else: # update stored settings replies_enabled = 'replies' in request.form learn_from_cw = 'cw-learning' in request.form if request.form['fake-mention-style'] not in ['full', 'brief']: session['error'] = "Invalid setting for fake mention style." return redirect("/bot/edit/{}".format(id), 303) if request.form['fake-mentions'] not in ['always', 'middle', 'never']: session['error'] = "Invalid setting for fake mentions." return redirect("/bot/edit/{}".format(id), 303) if request.form['privacy'] not in ['public', 'unlisted', 'private']: session['error'] = "Invalid setting for post privacy." return redirect("/bot/edit/{}".format(id), 303) if int(request.form['length']) < 100 or int(request.form['length']) > 5000: session['error'] = "Invalid setting for maximum post length." return redirect("/bot/edit/{}".format(id), 303) if int(request.form['freq']) < 15 or int(request.form['freq']) > 240 or int(request.form['freq']) % 5: session['error'] = "Invalid setting for post frequency." return redirect("/bot/edit/{}".format(id), 303) if len(request.form['cw']) > 128: session['error'] = "Content warning cannot exceed 128 characters." return redirect("/bot/edit/{}".format(id), 303) c = mysql.connection.cursor() try: c.execute("UPDATE bots SET replies_enabled = %s, post_frequency = %s, content_warning = %s, length = %s, fake_mentions = %s, fake_mentions_full = %s, post_privacy = %s, learn_from_cw = %s WHERE handle = %s", ( replies_enabled, request.form['freq'], request.form['cw'] if request.form['cw'] != "" else None, request.form['length'], request.form['fake-mentions'], request.form['fake-mention-style'] == 'full', request.form['privacy'], learn_from_cw, id )) mysql.connection.commit() c.close() except: session['error'] = "Couldn't save your settings." return redirect("/bot/edit/{}".format(id), 303) session['success'] = True return redirect("/bot/edit/{}".format(id), 303) @app.route("/bot/delete/", methods=['GET', 'POST']) def bot_delete(id): if bot_check(id): if request.method == 'GET': instance = id.split("@")[2] return render_template("bot_delete.html", instance = instance) else: # delete bot by deleting its credentials # FK constraint will delete bot c = mysql.connection.cursor() c.execute("SELECT `credentials_id` FROM `bots` WHERE `handle` = %s", (id,)) credentials_id = c.fetchone()[0] c.execute("DELETE FROM `credentials` WHERE `id` = %s", (credentials_id,)) c.close() mysql.connection.commit() return redirect(url_for("home"), 303) @app.route("/bot/toggle/") def bot_toggle(id): if bot_check(id): c = mysql.connection.cursor() c.execute("UPDATE `bots` SET `enabled` = NOT `enabled` WHERE `handle` = %s", (id,)) mysql.connection.commit() c.close() return redirect(url_for("home"), 303) @app.route("/bot/chat/") def bot_chat(id): return render_template("coming_soon.html") @app.route("/bot/blacklist/") def bot_blacklist(id): return render_template("coming_soon.html") @app.route("/bot/accounts/") def bot_accounts(id): if bot_check(id): session['bot'] = id c = mysql.connection.cursor() c.execute("SELECT COUNT(*) FROM `bot_learned_accounts` WHERE `bot_id` = %s", (id,)) user_count = c.fetchone()[0] users = {} post_count = {} if user_count > 0: dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor) dc.execute("SELECT `fedi_id`, `enabled` FROM `bot_learned_accounts` WHERE `bot_id` = %s", (id,)) users = dc.fetchall() dc.close() post_count = {} for user in users: c.execute("SELECT COUNT(*) FROM `posts` WHERE `fedi_id` = %s", (user['fedi_id'],)) post_count[user['fedi_id']] = c.fetchone()[0] c.close() return render_template("bot_accounts.html", users = users, post_count = post_count) @app.route("/bot/accounts/add", methods = ['GET', 'POST']) def bot_accounts_add(): if request.method == 'POST': if session['step'] == 1: if request.form['account'] == session['bot']: error = "Bots cannot learn from themselves." return render_template("bot_accounts_add.html", error = error) # look up user handle_list = request.form['account'].split('@') if len(handle_list) != 3: # not formatted correctly error = "Incorrectly formatted handle." return render_template("bot_accounts_add.html", error = error) username = handle_list[1] instance = handle_list[2] # gab check try: r = requests.get("https://{}/api/v1/instance".format(instance), timeout=10) except requests.exceptions.ConnectionError: error = "Couldn't connect to {}.".format(instance) return render_template("bot_accounts_add.html", error = error) except: error = "An unknown error occurred." return render_template("bot_accounts_add.html", error = error) if r.status_code == 200: j = r.json() if 'contact_account' in j and 'is_pro' in j['contact_account']: # gab instance error = "Gab instances are not supported." return render_template("bot_accounts_add.html", error = error) # 1. download host-meta to find webfinger URL r = requests.get("https://{}/.well-known/host-meta".format(instance), timeout=10) if r.status_code != 200: error = "Couldn't get host-meta." return render_template("bot_accounts_add.html", error = error) # 2. use webfinger to find user's info page #TODO: use more reliable method try: uri = re.search(r'template="([^"]+)"', r.text).group(1) uri = uri.format(uri = "{}@{}".format(username, instance)) except: error = "Couldn't find WebFinger URL." return render_template("bot_accounts_add.html", error = error) r = requests.get(uri, headers={"Accept": "application/json"}, timeout=10) try: j = r.json() except: error = "Invalid WebFinger response." return render_template("bot_accounts_add.html", error = error) found = False for link in j['links']: if link['rel'] == 'self': #this is a link formatted like "https://instan.ce/users/username", which is what we need uri = link['href'] found = True break if not found: error = "Couldn't find a valid ActivityPub outbox URL." return render_template("bot_accounts_add.html", error = error) # 3. format as outbox URL and check to make sure it works outbox = "{}/outbox?page=true".format(uri) r = requests.get(uri, headers={"Accept": "application/json"}, timeout=10) if r.status_code == 200: # success!! c = mysql.connection.cursor() c.execute("REPLACE INTO `fedi_accounts` (`handle`, `outbox`) VALUES (%s, %s)", (request.form['account'], outbox)) c.execute("INSERT INTO `bot_learned_accounts` (`bot_id`, `fedi_id`) VALUES (%s, %s)", (session['bot'], request.form['account'])) c.close() mysql.connection.commit() return redirect("/bot/accounts/{}".format(session['bot']), 303) else: error = "Couldn't access ActivityPub outbox. {} may require authenticated fetches, which FediBooks doesn't support yet.".format(instance) return render_template("bot_accounts_add.html", error = error) else: # new account add request session['step'] = 1 return render_template("bot_accounts_add.html", error = session.pop('error', None)) @app.route("/bot/accounts/toggle/") def bot_accounts_toggle(id): c = mysql.connection.cursor() c.execute("UPDATE `bot_learned_accounts` SET `enabled` = NOT `enabled` WHERE `fedi_id` = %s AND `bot_id` = %s", (id, session['bot'])) mysql.connection.commit() c.close() return redirect("/bot/accounts/{}".format(session['bot']), 303) @app.route("/bot/accounts/delete/", methods=['GET', 'POST']) def bot_accounts_delete(id): if request.method == 'GET': instance = id.split("@")[2] return render_template("bot_accounts_delete.html", user = id, instance = instance) else: #NOTE: when user credential support is added, we'll need to delete the creds too c = mysql.connection.cursor() c.execute("DELETE FROM `bot_learned_accounts` WHERE `fedi_id` = %s AND bot_id = %s", (id, session['bot'])) # check to see if anyone else is learning from this account c.execute("SELECT COUNT(*) FROM `bot_learned_accounts` WHERE `fedi_id` = %s", (id,)) if c.fetchone()[0] == 0: # nobody else learns from this account, remove it from the db c.execute("DELETE FROM `fedi_accounts` WHERE `handle` = %s", (id,)) c.close() mysql.connection.commit() return redirect("/bot/accounts/{}".format(session['bot']), 303) @app.route("/bot/create/", methods=['GET', 'POST']) def bot_create(): if request.method == 'POST': if session['step'] == 1: # strip leading https://, if provided session['instance'] = re.match(r"^(?:https?:\/\/)?(.*)", request.form['instance']).group(1) # check for mastodon/pleroma try: r = requests.get("https://{}/api/v1/instance".format(session['instance']), timeout=10) except requests.ConnectionError: session['error'] = "Couldn't connect to https://{}.".format(session['instance']) return render_template("bot_create.html", error = session.pop('error', None)) except: session['error'] = "An unknown error occurred while trying to load https://{}".format(session['instance']) return render_template("bot_create.html", error = session.pop('error', None)) if r.status_code == 200: j = r.json() if "Pleroma" in j['version']: session['instance_type'] = "Pleroma" session['step'] += 1 else: if 'contact_account' in j and 'is_pro' in j['contact_account']: # gab instance session['error'] = "Gab instances are not supported." else: session['instance_type'] = "Mastodon" session['step'] += 1 else: # not a masto/pleroma instance # misskey is currently unsupported # all other instance types are also unsupported # return an error message #TODO: misskey session['error'] = "Unsupported instance type. Misskey support is planned." elif session['step'] == 2: # nothing needs to be done here, this step just informs the user that their instance type is supported session['step'] += 1 elif session['step'] == 3: # authenticate with the given instance and obtain credentials if session['instance_type'] in ['Mastodon', 'Pleroma']: redirect_uri = '{}/do/authenticate_bot'.format(cfg['base_uri']) session['client_id'], session['client_secret'] = Mastodon.create_app( "FediBooks", api_base_url="https://{}".format(session['instance']), scopes=scopes if session['instance_type'] == 'Mastodon' else scopes_pleroma, redirect_uris=[redirect_uri], website=cfg['base_uri'] ) client = Mastodon( client_id=session['client_id'], client_secret=session['client_secret'], api_base_url="https://{}".format(session['instance']) ) url = client.auth_request_url( client_id=session['client_id'], redirect_uris=redirect_uri, scopes=scopes if session['instance_type'] == 'Mastodon' else scopes_pleroma ) return redirect(url, code=303) elif session['instance_type'] == 'Misskey': # todo pass else: # the user clicked next on step 2 while having an unsupported instance type # take them back home del session['instance'] del session['instance_type'] session['step'] = 1 return redirect(url_for("home"), 303) else: if 'step' in session and session['step'] == 4: try: # test authentication client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance']) session['secret'] = client.log_in( code = session['code'], scopes=scopes if session['instance_type'] == 'Mastodon' else scopes_pleroma, redirect_uri='{}/do/authenticate_bot'.format(cfg['base_uri']) ) username = client.account_verify_credentials()['username'] handle = "@{}@{}".format(username, session['instance']) except: # authentication error occurred error = "Authentication failed." session['step'] = 3 return render_template("bot_create.html", error = error) # authentication success!! c = mysql.connection.cursor() c.execute("INSERT INTO `credentials` (client_id, client_secret, secret) VALUES (%s, %s, %s)", (session['client_id'], session['client_secret'], session['secret'])) credentials_id = c.lastrowid mysql.connection.commit() # get webpush url privated, publicd = client.push_subscription_generate_keys() private = privated['privkey'] public = publicd['pubkey'] secret = privated['auth'] client.push_subscription_set("{}/push/{}".format(cfg['base_uri'], handle), publicd, mention_events = True) c.execute("INSERT INTO `bots` (handle, user_id, credentials_id, push_public_key, push_private_key, push_secret) VALUES (%s, %s, %s, %s, %s, %s)", (handle, session['user_id'], credentials_id, public, private, secret)) mysql.connection.commit() c.close() # clean up unneeded variables del session['code'] del session['instance'] del session['instance_type'] del session['client_id'] del session['client_secret'] else: # user is starting a new bot create request session['step'] = 1 return render_template("bot_create.html", error = session.pop('error', None)) @app.route("/bot/create/back") def bot_create_back(): session['step'] -= 1 return redirect(url_for("bot_create"), 303) @app.route("/do/authenticate_bot") def do_authenticate_bot(): session['code'] = request.args.get('code') session['step'] = 4 return redirect(url_for("bot_create"), 303) @app.route("/push/", methods = ['POST']) def push(id): c = mysql.connection.cursor() c.execute("SELECT client_id, client_secret, secret FROM credentials WHERE id = (SELECT credentials_id FROM bots WHERE handle = %s)", (id,)) login = c.fetchone() client = Mastodon( client_id = login[0], client_secret = login[1], access_token = login[2], api_base_url = "https://{}".format(id.split("@")[2]) ) c.execute("SELECT push_private_key, push_secret, replies_enabled FROM bots WHERE handle = %s", (id,)) bot = c.fetchone() if not bot[2]: return "Replies disabled." params = { 'privkey': int(bot[0].rstrip("\0")), 'auth': bot[1] } push_object = client.push_subscription_decrypt_push(request.data, params, request.headers['Encryption'], request.headers['Crypto-Key']) notification = client.notifications(id = push_object['notification_id']) me = client.account_verify_credentials()['id'] # first, check how many times the bot has posted in this thread. # if it's over 15, don't reply. # this is to stop endless reply chains between two bots. try: context = client.status_context(notification['status']['id']) my_posts = 0 for post in context['ancestors']: if post['account']['id'] == me: my_posts += 1 if my_posts >= 15: # don't reply return "Didn't reply." except: # failed to fetch context # assume we haven't been participating in this thread pass functions.make_post([id, notification['status']['id'], notification['status']['visibility'], "@" + notification['account']['acct']]) return "Success!" @app.route("/do/signup", methods=['POST']) def do_signup(): # email validation is basically impossible without actually sending an email to the address # because fedibooks can't send email yet, we'll just check if the string contains an @ ;) if "@" not in request.form['email']: session['error'] = "Invalid email address." return redirect(url_for("show_signup_page"), 303) if len(request.form['password']) < 8: session['error'] = "Password too short." return redirect(url_for("show_signup_page"), 303) c = mysql.connection.cursor() c.execute("SELECT COUNT(*) FROM users WHERE email = %s", (request.form['email'],)) if c.fetchone()[0] > 0: session['error'] = "Email address already in use." return redirect(url_for("show_signup_page"), 303) pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest().replace(b"\0", b"\1") pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12)) # try to sign up c.execute("INSERT INTO `users` (email, password) VALUES (%s, %s)", (request.form['email'], pw)) user_id = c.lastrowid mysql.connection.commit() c.close() # success! session['user_id'] = user_id return redirect(url_for('home')) @app.route("/do/signout") def do_signout(): session.clear() return redirect(url_for("home")) @app.route("/do/login", methods=['POST']) def do_login(): pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest().replace(b"\0", b"\1") c = mysql.connection.cursor(MySQLdb.cursors.DictCursor) c.execute("SELECT * FROM users WHERE email = %s", (request.form['email'],)) data = c.fetchone() c.close() if data == None: session['error'] = "Incorrect login information." return redirect(url_for("show_login_page"), 303) if bcrypt.checkpw(pw_hashed, data['password']): session['user_id'] = data['id'] return redirect(url_for("home")) else: session['error'] = "Incorrect login information." return redirect(url_for("show_login_page"), 303) @app.route("/issue/bug") def report_bug(): return render_template("report_bug.html") @app.route("/help/settings") def help_settings(): return render_template("help_settings.html") @app.route("/img/bot_generic.png") def img_bot_generic(): return send_file("static/bot_generic.png", mimetype="image/png") @app.route("/favicon.ico") def favicon(): return send_file("static/favicon.ico") def bot_check(bot): # check to ensure bot is owned by user c = mysql.connection.cursor() c.execute("SELECT COUNT(*) FROM `bots` WHERE `handle` = %s AND `user_id` = %s", (bot, session['user_id'])) return c.fetchone()[0] == 1