FediBooks/app/webui.py

383 lines
13 KiB
Python
Raw Normal View History

2020-03-18 05:02:55 +00:00
from flask import Flask, render_template, session, request, redirect, url_for, send_file, jsonify
2019-09-01 05:19:30 +00:00
from flask_mysqldb import MySQL
2019-09-02 03:07:46 +00:00
from mastodon import Mastodon
2019-09-01 14:22:26 +00:00
import requests
2019-09-01 07:30:00 +00:00
import MySQLdb
2019-09-01 07:09:08 +00:00
import bcrypt
2019-09-01 15:07:50 +00:00
import json, hashlib, re
2019-09-11 04:37:13 +00:00
import functions
from pages.home import home
from pages.settings import settings
2019-09-18 11:18:01 +00:00
from pages.bot.edit import bot_edit
2019-09-18 11:27:36 +00:00
from pages.bot.accounts_add import bot_accounts_add
from pages.bot.create import bot_create
2019-08-31 03:26:20 +00:00
cfg = json.load(open("config.json"))
2019-08-27 10:36:54 +00:00
app = Flask(__name__)
2019-08-31 03:26:20 +00:00
app.secret_key = cfg['secret_key']
2019-08-27 10:36:54 +00:00
app.config['MYSQL_HOST'] = cfg['db_host']
app.config['MYSQL_DB'] = cfg['db_name']
app.config['MYSQL_USER'] = cfg['db_user']
app.config['MYSQL_PASSWORD'] = cfg['db_pass']
2019-09-01 05:19:30 +00:00
mysql = MySQL(app)
2019-09-01 05:19:30 +00:00
2019-09-03 01:47:59 +00:00
scopes = ['write:statuses', 'write:accounts', 'read:accounts', 'read:notifications', 'read:statuses', 'push']
2019-09-12 11:54:55 +00:00
scopes_pleroma = ['read', 'write', 'push']
2019-09-02 03:07:46 +00:00
2019-09-09 09:56:19 +00:00
@app.before_request
def login_check():
if request.path not in ['/', '/about', '/welcome', '/login', '/signup', '/do/login', '/do/signup'] and not request.path.startswith("/push") and not request.path.startswith('/static'):
2019-09-09 09:56:19 +00:00
# page requires authentication
if 'user_id' not in session:
2019-09-19 09:54:28 +00:00
return redirect(url_for('render_home'))
2019-09-09 09:56:19 +00:00
2019-08-27 10:36:54 +00:00
@app.route("/")
def render_home():
return home(mysql)
2019-08-29 01:15:47 +00:00
@app.route("/welcome")
def welcome():
return render_template("welcome.html")
2019-08-30 12:30:59 +00:00
@app.route("/about")
def about():
return render_template("about.html")
@app.route("/login")
def show_login_page():
2019-09-09 12:15:10 +00:00
return render_template("login.html", signup = False, error = session.pop('error', None))
2019-08-29 05:08:11 +00:00
@app.route("/signup")
def show_signup_page():
2019-09-09 12:15:10 +00:00
return render_template("login.html", signup = True, error = session.pop('error', None))
2019-08-29 13:51:31 +00:00
2019-09-09 12:08:43 +00:00
@app.route("/settings", methods=['GET', 'POST'])
def render_settings():
return settings(mysql)
2019-08-30 03:56:28 +00:00
@app.route("/delete", methods=['GET', 'POST'])
2019-12-19 09:00:54 +00:00
def render_delete():
if request.method == 'GET':
return render_template("close_account.html", error = session.pop('error', None))
else:
# deletion logic
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest().replace(b"\0", b"\1")
c = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
c.execute("SELECT * FROM users WHERE id = %s", (session['user_id'],))
data = c.fetchone()
c.close()
if data == None:
# should never happen ;)
session['error'] = "An unknown error occurred."
return redirect(url_for("render_delete"), 303)
2020-01-20 02:12:53 +00:00
if bcrypt.checkpw(pw_hashed, data['password']):
# passwords match, delete the account
session['error'] = "succ ess"
c = mysql.connection.cursor()
c.execute("SELECT credentials_id FROM bots WHERE user_id = %s", (session['user_id'],))
credentials_list = c.fetchall()
for credentials_id in credentials_list:
c.execute("SELECT client_id, client_secret, secret FROM credentials WHERE id = %s", (credentials_id,))
# TODO: maybe schedule the push deletions on a cron job or something, if the user has a lot of accounts (or they're on slow instances) this could take a while or even time out
credentials = c.fetchone()
try:
client = Mastodon(
credentials[0],
credentials[1],
credentials[2],
"https://{}".format(id.split("@")[2])
)
client.push_subscription_delete()
except:
# if it fails, don't prevent the user from deleting their account
# TODO: maybe notify that some accounts failed to unregister push
pass
c.execute("DELETE FROM `credentials` WHERE `id` = %s", (credentials_id,))
# the big boy step
c.execute("DELETE FROM users WHERE id = %s", (session['user_id'],))
c.close()
mysql.connection.commit()
2020-01-20 02:12:53 +00:00
# TODO: show a "deletion successful" message or something
return redirect(url_for("do_signout"), 303)
else:
session['error'] = "Password incorrect."
return redirect(url_for("render_delete"), 303)
2019-12-19 09:00:54 +00:00
@app.route("/bot/edit/<id>", methods = ['GET', 'POST'])
2019-09-18 11:18:01 +00:00
def render_bot_edit(id):
return bot_edit(id, mysql)
2019-09-02 08:02:26 +00:00
@app.route("/bot/delete/<id>", methods=['GET', 'POST'])
2019-08-30 11:28:34 +00:00
def bot_delete(id):
if bot_check(id):
2019-09-02 08:02:26 +00:00
if request.method == 'GET':
instance = id.split("@")[2]
c = mysql.connection.cursor()
c.execute("SELECT icon FROM bots WHERE handle = %s", (id,))
icon = c.fetchone()[0]
return render_template("bot/delete.html", instance = instance, icon = icon)
2019-09-02 08:02:26 +00:00
else:
# delete bot by deleting its credentials
# FK constraint will delete bot
c = mysql.connection.cursor()
c.execute("SELECT credentials_id FROM bots WHERE handle = %s", (id,))
2019-09-02 08:02:26 +00:00
credentials_id = c.fetchone()[0]
c.execute("SELECT client_id, client_secret, secret FROM credentials WHERE id = %s", (credentials_id,))
credentials = c.fetchone()
client = Mastodon(
credentials[0],
credentials[1],
credentials[2],
"https://{}".format(id.split("@")[2])
)
client.push_subscription_delete()
2019-09-02 08:02:26 +00:00
c.execute("DELETE FROM `credentials` WHERE `id` = %s", (credentials_id,))
c.close()
mysql.connection.commit()
return redirect(url_for("render_home"), 303)
2019-08-30 11:28:34 +00:00
2019-09-02 07:38:29 +00:00
@app.route("/bot/toggle/<id>")
def bot_toggle(id):
if bot_check(id):
c = mysql.connection.cursor()
c.execute("UPDATE `bots` SET `enabled` = NOT `enabled` WHERE `handle` = %s", (id,))
mysql.connection.commit()
c.close()
2019-09-21 05:42:37 +00:00
return redirect(url_for("render_home"), 303)
2019-09-02 07:38:29 +00:00
@app.route("/bot/chat/<id>")
def bot_chat(id):
2020-01-20 02:43:25 +00:00
# return render_template("coming_soon.html")
2020-01-20 03:01:56 +00:00
if bot_check(id):
2020-01-20 03:46:30 +00:00
c = mysql.connection.cursor()
c.execute("SELECT icon FROM `bots` WHERE handle = %s", (id,))
icon = c.fetchone()[0]
if icon is None:
icon = "/img/bot_generic.png"
return render_template("/bot/chat.html", bot = id, icon = icon)
2020-01-20 03:01:56 +00:00
@app.route("/bot/chat/<id>/message")
def bot_chat_message(id):
if bot_check(id):
_, message = functions.generate_output(id)
return message
@app.route("/bot/blacklist/<id>")
def bot_blacklist(id):
return render_template("coming_soon.html")
2019-09-01 04:02:42 +00:00
@app.route("/bot/accounts/<id>")
def bot_accounts(id):
2019-09-02 10:07:16 +00:00
if bot_check(id):
session['bot'] = id
2019-09-02 10:26:45 +00:00
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM `bot_learned_accounts` WHERE `bot_id` = %s", (id,))
user_count = c.fetchone()[0]
users = {}
2019-09-06 02:47:39 +00:00
post_count = {}
2019-09-02 10:26:45 +00:00
if user_count > 0:
dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
dc.execute("SELECT `fedi_id`, `enabled` FROM `bot_learned_accounts` WHERE `bot_id` = %s", (id,))
users = dc.fetchall()
dc.close()
2019-09-06 02:47:39 +00:00
post_count = {}
for user in users:
c.execute("SELECT COUNT(*) FROM `posts` WHERE `fedi_id` = %s", (user['fedi_id'],))
post_count[user['fedi_id']] = c.fetchone()[0]
2019-09-02 10:26:45 +00:00
c.close()
return render_template("bot/accounts.html", users = users, post_count = post_count)
2019-09-01 04:02:42 +00:00
2019-09-02 10:07:16 +00:00
@app.route("/bot/accounts/add", methods = ['GET', 'POST'])
def render_bot_accounts_add():
return bot_accounts_add(mysql, cfg)
2019-09-01 04:41:33 +00:00
@app.route("/bot/accounts/toggle/<id>")
def bot_accounts_toggle(id):
c = mysql.connection.cursor()
c.execute("UPDATE `bot_learned_accounts` SET `enabled` = NOT `enabled` WHERE `fedi_id` = %s AND `bot_id` = %s", (id, session['bot']))
mysql.connection.commit()
c.close()
return redirect("/bot/accounts/{}".format(session['bot']), 303)
@app.route("/bot/accounts/delete/<id>", methods=['GET', 'POST'])
def bot_accounts_delete(id):
if request.method == 'GET':
instance = id.split("@")[2]
return render_template("bot/accounts_delete.html", user = id, instance = instance)
else:
#NOTE: when user credential support is added, we'll need to delete the creds too
c = mysql.connection.cursor()
c.execute("DELETE FROM `bot_learned_accounts` WHERE `fedi_id` = %s AND bot_id = %s", (id, session['bot']))
# check to see if anyone else is learning from this account
c.execute("SELECT COUNT(*) FROM `bot_learned_accounts` WHERE `fedi_id` = %s", (id,))
if c.fetchone()[0] == 0:
# nobody else learns from this account, remove it from the db
c.execute("DELETE FROM `fedi_accounts` WHERE `handle` = %s", (id,))
c.close()
mysql.connection.commit()
2019-09-06 02:38:50 +00:00
return redirect("/bot/accounts/{}".format(session['bot']), 303)
2019-09-01 15:07:50 +00:00
@app.route("/bot/create/", methods=['GET', 'POST'])
def render_bot_create():
return bot_create(mysql, cfg, scopes, scopes_pleroma)
2019-09-01 07:09:08 +00:00
2019-09-02 05:35:02 +00:00
@app.route("/bot/create/back")
def bot_create_back():
session['step'] -= 1
2019-09-19 05:51:46 +00:00
return redirect(url_for("render_bot_create"), 303)
2019-09-02 05:35:02 +00:00
2019-09-02 03:07:46 +00:00
@app.route("/do/authenticate_bot")
def do_authenticate_bot():
session['code'] = request.args.get('code')
session['step'] = 4
2019-09-19 05:51:46 +00:00
return redirect(url_for("render_bot_create"), 303)
2019-09-02 03:07:46 +00:00
2019-09-11 04:15:26 +00:00
@app.route("/push/<id>", methods = ['POST'])
2019-09-10 12:53:15 +00:00
def push(id):
2019-09-11 04:15:26 +00:00
c = mysql.connection.cursor()
c.execute("SELECT client_id, client_secret, secret FROM credentials WHERE id = (SELECT credentials_id FROM bots WHERE handle = %s)", (id,))
login = c.fetchone()
client = Mastodon(
client_id = login[0],
client_secret = login[1],
access_token = login[2],
api_base_url = "https://{}".format(id.split("@")[2])
)
2019-09-11 04:37:13 +00:00
c.execute("SELECT push_private_key, push_secret, replies_enabled FROM bots WHERE handle = %s", (id,))
bot = c.fetchone()
if not bot[2]:
return "Replies disabled."
2019-09-11 04:15:26 +00:00
params = {
2019-09-11 04:37:13 +00:00
'privkey': int(bot[0].rstrip("\0")),
'auth': bot[1]
2019-09-11 04:15:26 +00:00
}
2020-01-20 02:12:53 +00:00
try:
push_object = client.push_subscription_decrypt_push(request.data, params, request.headers['Encryption'], request.headers['Crypto-Key'])
notification = client.notifications(id = push_object['notification_id'])
me = client.account_verify_credentials()['id']
except:
return "Push failed - do we still have access to {}?".format(id)
2019-09-11 04:37:13 +00:00
# first, check how many times the bot has posted in this thread.
# if it's over 15, don't reply.
# this is to stop endless reply chains between two bots.
try:
context = client.status_context(notification['status']['id'])
my_posts = 0
for post in context['ancestors']:
if post['account']['id'] == me:
my_posts += 1
if my_posts >= 15:
# don't reply
return "Didn't reply."
except:
# failed to fetch context
# assume we haven't been participating in this thread
pass
functions.make_post([id, notification['status']['id'], notification['status']['visibility'], "@" + notification['account']['acct']])
2019-09-11 04:37:13 +00:00
return "Success!"
2019-09-10 12:53:15 +00:00
2019-09-01 07:09:08 +00:00
@app.route("/do/signup", methods=['POST'])
def do_signup():
# email validation is basically impossible without actually sending an email to the address
# because fedibooks can't send email yet, we'll just check if the string contains an @ ;)
if "@" not in request.form['email']:
session['error'] = "Invalid email address."
return redirect(url_for("show_signup_page"), 303)
2019-09-01 07:09:08 +00:00
if len(request.form['password']) < 8:
session['error'] = "Password too short."
return redirect(url_for("show_signup_page"), 303)
2019-09-01 07:09:08 +00:00
2019-09-09 12:08:43 +00:00
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM users WHERE email = %s", (request.form['email'],))
if c.fetchone()[0] > 0:
session['error'] = "Email address already in use."
return redirect(url_for("show_signup_page"), 303)
2019-09-14 04:37:25 +00:00
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest().replace(b"\0", b"\1")
2019-09-01 07:17:07 +00:00
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12))
2019-09-01 07:09:08 +00:00
# try to sign up
c.execute("INSERT INTO `users` (email, password) VALUES (%s, %s)", (request.form['email'], pw))
2019-09-02 03:07:46 +00:00
user_id = c.lastrowid
2019-09-01 07:09:08 +00:00
mysql.connection.commit()
c.close()
2019-09-01 07:16:52 +00:00
# success!
2019-09-02 03:07:46 +00:00
session['user_id'] = user_id
2019-09-19 05:51:46 +00:00
return redirect(url_for('render_home'))
2019-09-01 07:19:17 +00:00
@app.route("/do/signout")
def do_signout():
session.clear()
2019-09-19 05:51:46 +00:00
return redirect(url_for("render_home"))
2019-09-01 07:30:00 +00:00
@app.route("/do/login", methods=['POST'])
def do_login():
2019-09-14 04:37:25 +00:00
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest().replace(b"\0", b"\1")
2019-09-01 07:30:00 +00:00
c = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
c.execute("SELECT * FROM users WHERE email = %s", (request.form['email'],))
data = c.fetchone()
c.close()
if data == None:
session['error'] = "Incorrect login information."
return redirect(url_for("show_login_page"), 303)
2020-01-20 02:12:53 +00:00
2019-09-01 07:30:00 +00:00
if bcrypt.checkpw(pw_hashed, data['password']):
2019-09-02 03:07:46 +00:00
session['user_id'] = data['id']
2019-09-19 05:51:46 +00:00
return redirect(url_for("render_home"))
2019-09-01 07:30:00 +00:00
else:
session['error'] = "Incorrect login information."
return redirect(url_for("show_login_page"), 303)
2019-09-06 10:54:29 +00:00
@app.route("/issue/bug")
def report_bug():
# return render_template("report_bug.html")
return render_template("coming_soon.html")
2019-09-06 10:54:29 +00:00
2019-09-14 04:51:07 +00:00
@app.route("/help/settings")
def help_settings():
return render_template("help/settings.html")
2019-09-14 04:51:07 +00:00
2019-09-02 07:38:29 +00:00
@app.route("/img/bot_generic.png")
def img_bot_generic():
return send_file("static/bot_generic.png", mimetype="image/png")
2019-09-10 07:00:14 +00:00
@app.route("/favicon.ico")
def favicon():
return send_file("static/favicon.ico")
2020-03-18 05:02:55 +00:00
@app.route("/.well-known/webfinger")
def webfinger():
return render_template("webfinger.json", base_uri = cfg['base_uri']), 200, {'Content-type':'application/json'}
def bot_check(bot):
2019-09-03 03:29:50 +00:00
# check to ensure bot is owned by user
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM `bots` WHERE `handle` = %s AND `user_id` = %s", (bot, session['user_id']))
return c.fetchone()[0] == 1