FediBooks/webui.py

275 lines
8.6 KiB
Python
Raw Normal View History

2019-09-02 07:38:29 +00:00
from flask import Flask, render_template, session, request, redirect, url_for, send_file
2019-09-01 05:19:30 +00:00
from flask_mysqldb import MySQL
2019-09-02 03:07:46 +00:00
from mastodon import Mastodon
2019-09-01 14:22:26 +00:00
import requests
2019-09-01 07:30:00 +00:00
import MySQLdb
2019-09-01 07:09:08 +00:00
import bcrypt
2019-09-01 15:07:50 +00:00
import json, hashlib, re
2019-08-31 03:26:20 +00:00
cfg = json.load(open("config.json"))
2019-08-27 10:36:54 +00:00
app = Flask(__name__)
2019-08-31 03:26:20 +00:00
app.secret_key = cfg['secret_key']
2019-08-27 10:36:54 +00:00
app.config['MYSQL_HOST'] = cfg['db_host']
app.config['MYSQL_DB'] = cfg['db_name']
app.config['MYSQL_USER'] = cfg['db_user']
app.config['MYSQL_PASSWORD'] = cfg['db_pass']
2019-09-01 05:19:30 +00:00
mysql = MySQL(app)
2019-09-01 05:19:30 +00:00
2019-09-02 03:07:46 +00:00
scopes = ['write:statuses', 'write:accounts', 'read:accounts', 'read:notifications', 'read:statuses']
2019-08-27 10:36:54 +00:00
@app.route("/")
def home():
2019-09-02 03:07:46 +00:00
if 'user_id' in session:
2019-09-01 04:57:03 +00:00
session['step'] = 1
c = mysql.connection.cursor()
2019-09-02 03:07:46 +00:00
c.execute("SELECT COUNT(*) FROM `bots` WHERE user_id = %s", (session['user_id'],))
bot_count = c.fetchone()[0]
active_count = None
2019-09-02 06:36:42 +00:00
bots = {}
2019-09-02 03:42:34 +00:00
bot_users = None
if bot_count > 0:
2019-09-02 03:07:46 +00:00
c.execute("SELECT COUNT(*) FROM `bots` WHERE user_id = %s AND enabled = TRUE", (session['user_id'],))
active_count = c.fetchone()[0]
2019-09-02 03:42:34 +00:00
dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
2019-09-02 06:49:21 +00:00
dc.execute("SELECT `handle`, `enabled` FROM `bots` WHERE user_id = %s", (session['user_id'],))
2019-09-02 03:42:34 +00:00
bots = dc.fetchall()
dc.close()
bot_users = {}
for bot in bots:
2019-09-02 05:35:02 +00:00
# multiple SELECTS is slow, maybe SELECT all at once and filter with python?
2019-09-02 06:36:42 +00:00
c.execute("SELECT COUNT(*) FROM `bot_learned_accounts` WHERE bot_id = %s", (bot['handle'],))
bot_users[bot['handle']] = c.fetchone()[0]
2019-09-02 03:42:34 +00:00
c.close()
2019-09-02 03:42:34 +00:00
return render_template("home.html", bot_count = bot_count, active_count = active_count, bots = bots, bot_users = bot_users)
else:
return render_template("front_page.html")
2019-08-29 01:15:47 +00:00
@app.route("/welcome")
def welcome():
return render_template("welcome.html")
2019-08-30 12:30:59 +00:00
@app.route("/about")
def about():
return render_template("about.html")
@app.route("/login")
def show_login_page():
2019-08-29 05:08:11 +00:00
return render_template("login.html", signup = False)
@app.route("/signup")
2019-09-01 07:09:08 +00:00
def show_signup_page(error = None):
#TODO: display error if any
2019-08-29 05:08:11 +00:00
return render_template("login.html", signup = True)
2019-08-29 13:51:31 +00:00
2019-08-30 03:56:28 +00:00
@app.route("/settings")
def settings():
2019-09-02 07:49:39 +00:00
dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
dc.execute("SELECT * FROM `users` WHERE id = %s", (session['user_id'],))
user = dc.fetchone()
dc.close()
return render_template("settings.html", user = user)
2019-08-30 03:56:28 +00:00
2019-08-29 13:51:31 +00:00
@app.route("/bot/edit/<id>")
def bot_edit(id):
return render_template("bot_edit.html")
2019-08-30 08:52:13 +00:00
2019-08-30 11:28:34 +00:00
@app.route("/bot/delete/<id>")
def bot_delete(id):
if bot_check(id):
instance = id.split("@")[2]
return render_template("bot_delete.html", instance = instance)
2019-08-30 11:28:34 +00:00
2019-09-02 07:38:29 +00:00
@app.route("/bot/toggle/<id>")
def bot_toggle(id):
if bot_check(id):
c = mysql.connection.cursor()
c.execute("UPDATE `bots` SET `enabled` = NOT `enabled` WHERE `handle` = %s", (id,))
mysql.connection.commit()
c.close()
return redirect(url_for("home"), 303)
@app.route("/bot/chat/<id>")
def bot_chat(id):
return render_template("coming_soon.html")
@app.route("/bot/blacklist/<id>")
def bot_blacklist(id):
return render_template("coming_soon.html")
2019-09-01 04:02:42 +00:00
@app.route("/bot/accounts/<id>")
def bot_accounts(id):
return render_template("bot_accounts.html")
2019-09-01 04:41:33 +00:00
@app.route("/bot/accounts/add")
def bot_accounts_add():
return render_template("bot_accounts_add.html")
2019-09-01 15:07:50 +00:00
@app.route("/bot/create/", methods=['GET', 'POST'])
2019-08-30 08:52:13 +00:00
def bot_create():
2019-09-02 03:07:46 +00:00
#TODO: error handling
2019-09-01 15:07:50 +00:00
if request.method == 'POST':
if session['step'] == 1:
# strip leading https://, if provided
session['instance'] = re.match(r"^(?:https?:\/\/)?(.*)", request.form['instance']).group(1)
# check for mastodon/pleroma
r = requests.get("https://{}/api/v1/instance".format(session['instance']))
if r.status_code == 200:
j = r.json()
if "Pleroma" in j['version']:
session['instance_type'] = "Pleroma"
session['step'] += 1
else:
if 'is_pro' in j['contact_account']:
# gab instance
session['error'] = "Eat shit and die, fascist scum."
else:
session['instance_type'] = "Mastodon"
session['step'] += 1
else:
# not a masto/pleroma instance
# misskey is currently unsupported
# all other instance types are also unsupported
# return an error message
#TODO: misskey
session['error'] = "Unsupported instance type."
elif session['step'] == 2:
2019-09-02 03:07:46 +00:00
# nothing needs to be done here, this step just informs the user that their instance type is supported
session['step'] += 1
elif session['step'] == 3:
# authenticate with the given instance and obtain credentials
if session['instance_type'] in ['Mastodon', 'Pleroma']:
redirect_uri = '{}/do/authenticate_bot'.format(cfg['base_uri'])
session['client_id'], session['client_secret'] = Mastodon.create_app(
"FediBooks",
api_base_url="https://{}".format(session['instance']),
scopes=scopes,
redirect_uris=[redirect_uri],
website=cfg['base_uri']
)
client = Mastodon(
client_id=session['client_id'],
client_secret=session['client_secret'],
api_base_url="https://{}".format(session['instance'])
)
url = client.auth_request_url(client_id=session['client_id'], redirect_uris=redirect_uri, scopes=scopes)
return redirect(url, code=303)
elif session['instance_type'] == 'Misskey':
# todo
pass
else:
# the user clicked next on step 2 while having an unsupported instance type
# take them back to step 1
del session['instance']
del session['instance_type']
session['step'] = 1
return bot_create()
2019-09-02 06:47:49 +00:00
else:
if session['step'] == 4:
2019-09-02 03:07:46 +00:00
try:
# test authentication
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance'])
session['secret'] = client.log_in(code = session['code'], scopes=scopes, redirect_uri='{}/do/authenticate_bot'.format(cfg['base_uri']))
username = client.account_verify_credentials()['username']
handle = "@{}@{}".format(username, session['instance'])
except:
# authentication error occurred
return render_template("bot_oauth_error.html")
# authentication success!!
c = mysql.connection.cursor()
c.execute("INSERT INTO `credentials` (client_id, client_secret, secret) VALUES (%s, %s, %s)", (session['client_id'], session['client_secret'], session['code']))
credentials_id = c.lastrowid
mysql.connection.commit()
2019-09-02 06:36:42 +00:00
c.execute("INSERT INTO `bots` (handle, user_id, credentials_id) VALUES (%s, %s, %s)", (handle, session['user_id'], credentials_id))
2019-09-02 03:07:46 +00:00
mysql.connection.commit()
c.close()
# clean up unneeded variables
del session['code']
del session['instance']
del session['instance_type']
del session['client_id']
del session['client_secret']
2019-09-01 15:07:50 +00:00
2019-08-30 08:52:13 +00:00
return render_template("bot_create.html")
2019-09-01 07:09:08 +00:00
2019-09-02 05:35:02 +00:00
@app.route("/bot/create/back")
def bot_create_back():
session['step'] -= 1
return redirect(url_for("bot_create"), 303)
2019-09-02 03:07:46 +00:00
@app.route("/do/authenticate_bot")
def do_authenticate_bot():
session['code'] = request.args.get('code')
session['step'] = 4
return redirect(url_for("bot_create"), 303)
2019-09-01 07:09:08 +00:00
@app.route("/do/signup", methods=['POST'])
def do_signup():
# email validation is basically impossible without actually sending an email to the address
# because fedibooks can't send email yet, we'll just check if the string contains an @ ;)
if "@" not in request.form['email']:
return show_signup_page("Invalid email address.")
if len(request.form['password']) < 8:
return show_signup_page("Password too short.")
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest()
2019-09-01 07:17:07 +00:00
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12))
2019-09-01 07:09:08 +00:00
# try to sign up
c = mysql.connection.cursor()
c.execute("INSERT INTO `users` (email, password) VALUES (%s, %s)", (request.form['email'], pw))
2019-09-02 03:07:46 +00:00
user_id = c.lastrowid
2019-09-01 07:09:08 +00:00
mysql.connection.commit()
c.close()
2019-09-01 07:16:52 +00:00
# success!
2019-09-02 03:07:46 +00:00
session['user_id'] = user_id
2019-09-01 07:16:52 +00:00
return redirect(url_for('home'))
2019-09-01 07:19:17 +00:00
@app.route("/do/signout")
def do_signout():
session.clear()
return redirect(url_for("home"))
2019-09-01 07:30:00 +00:00
@app.route("/do/login", methods=['POST'])
def do_login():
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest()
c = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
c.execute("SELECT * FROM users WHERE email = %s", (request.form['email'],))
data = c.fetchone()
c.close()
if bcrypt.checkpw(pw_hashed, data['password']):
2019-09-02 03:07:46 +00:00
session['user_id'] = data['id']
2019-09-01 07:30:00 +00:00
return redirect(url_for("home"))
else:
return "invalid login"
2019-09-02 07:38:29 +00:00
@app.route("/img/bot_generic.png")
def img_bot_generic():
return send_file("static/bot_generic.png", mimetype="image/png")
def bot_check(bot):
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM `bots` WHERE `handle` = %s AND `user_id` = %s", (bot, session['user_id']))
return c.fetchone()[0] == 1