224 lines
11 KiB
Python
Executable file
224 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
#Curious Greg - Curious Cat to Mastodon crossposter
|
|
# This Source Code Form is subject to the terms of the Mozilla Public
|
|
# License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
#TODO: ADD RETROSPRING SUPPORT!
|
|
|
|
import requests, json, hashlib, urllib, time, re, random
|
|
from mastodon import Mastodon
|
|
from flask import Flask, render_template, request, session, redirect, url_for
|
|
import mysql.connector
|
|
import bcrypt
|
|
|
|
cfg = json.load(open("meta.json"))
|
|
scopes = ["read:accounts", "write:statuses"]
|
|
settings = {
|
|
"cw": False,
|
|
# "disabled": False,
|
|
}
|
|
|
|
db = mysql.connector.connect(user=cfg['dbuser'], password=cfg['dbpass'], database=cfg['dbname'])
|
|
c = db.cursor()
|
|
dc = db.cursor(dictionary=True)
|
|
# MariaDB [curiousgreg]> DESCRIBE data;
|
|
# +---------------------+--------------+------+-----+------------------------------------------------+-----------------------------+
|
|
# | Field | Type | Null | Key | Default | Extra |
|
|
# +---------------------+--------------+------+-----+------------------------------------------------+-----------------------------+
|
|
# | username | varchar(64) | NO | PRI | NULL | |
|
|
# | instance | varchar(128) | NO | PRI | NULL | |
|
|
# | password | tinytext | NO | | NULL | |
|
|
# | avi | text | NO | | NULL | |
|
|
# | secret | tinytext | NO | | NULL | |
|
|
# | client_id | varchar(128) | NO | | NULL | |
|
|
# | client_secret | tinytext | NO | | NULL | |
|
|
# | cc | tinytext | YES | | NULL | |
|
|
# | ccavi | varchar(128) | YES | | https://lynnesbian.space/res/ceres/cc-smol.png | |
|
|
# | latest_post | tinytext | YES | | NULL | |
|
|
# | latest_timestamp | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
|
|
# | time_between_checks | int(11) | YES | | NULL | |
|
|
# | settings | longtext | YES | | NULL | |
|
|
# +---------------------+--------------+------+-----+------------------------------------------------+-----------------------------+
|
|
c.execute("CREATE TABLE IF NOT EXISTS `data` (username VARCHAR(64) NOT NULL, instance VARCHAR(128) NOT NULL, password TINYTEXT NOT NULL, avi TEXT NOT NULL, secret TINYTEXT NOT NULL, client_id VARCHAR(128) NOT NULL, client_secret TINYTEXT NOT NULL, cc TINYTEXT, ccavi VARCHAR(128) DEFAULT 'https://lynnesbian.space/res/ceres/cc-smol.png', latest_post TINYTEXT, latest_timestamp TIMESTAMP, time_between_checks INT, settings LONGTEXT, PRIMARY KEY(username, instance))")
|
|
|
|
app = Flask(cfg['name'])
|
|
app.secret_key = cfg['flask_key']
|
|
|
|
@app.route('/')
|
|
def main():
|
|
if 'acct' not in session:
|
|
return render_template("landing_page.html")
|
|
else:
|
|
return redirect(url_for('home'))
|
|
|
|
@app.route('/home')
|
|
def home():
|
|
if 'acct' in session:
|
|
if 'cc' not in session:
|
|
session['cc'] = "None"
|
|
if session['cc'] == "None" or 'ccavi' not in session:
|
|
#every time home is rendered without cc being set
|
|
c.execute("SELECT cc, ccavi FROM `data` WHERE client_id LIKE %s AND instance LIKE %s", (session['client_id'], session['instance']))
|
|
cc = c.fetchone()
|
|
if cc[0] != '':
|
|
session['cc'] = cc[0]
|
|
session['ccavi'] = cc[1]
|
|
|
|
|
|
if 'last_avi_update' not in session or session['last_avi_update'] + (24 * 60 * 60) < time.time():
|
|
#avatars haven't been updated for over 24 hours, update them now
|
|
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], access_token=session['secret'], api_base_url=session['instance'])
|
|
|
|
session['avi'] = client.account_verify_credentials()['avatar']
|
|
if session['cc'] != "None" and session['cc'] != None:
|
|
#update cc avi too
|
|
r = requests.get("https://curiouscat.me/api/v2/profile?username={}".format(session['cc']))
|
|
j = r.json()
|
|
session['ccavi'] = j['userData']['avatar']
|
|
c.execute("UPDATE data SET avi = %s, ccavi = %s WHERE client_id LIKE %s AND instance LIKE %s", (session['avi'], session['ccavi'], session['client_id'], session['instance']))
|
|
else:
|
|
c.execute("UPDATE data SET avi = %s WHERE client_id LIKE %s AND instance LIKE %s", (session['avi'], session['client_id'], session['instance']))
|
|
return render_template("home.html", mabg="background-image:url('{}')".format(session['avi']), ccbg="background-image:url('{}')".format(session['ccavi']))
|
|
else:
|
|
return redirect(url_for('main'))
|
|
|
|
|
|
@app.route('/debug') #TODO: remove this before making the site live ;p
|
|
def print_debug_info():
|
|
return json.dumps(session._get_current_object())
|
|
|
|
@app.route('/logout')
|
|
def reset_session():
|
|
session.clear()
|
|
return redirect(url_for('main'))
|
|
|
|
@app.route('/login')
|
|
def log_in():
|
|
if 'acct' in session:
|
|
#user is probably already logged in. if they aren't, home() will handle things and redirect them back here
|
|
return redirect(url_for('home'))
|
|
return render_template("login.html")
|
|
|
|
# return(json.dumps(client_info))
|
|
|
|
#internal stuff
|
|
|
|
@app.route('/internal/auth_a')
|
|
def internal_auth_a(): #TODO: prevent these endpoints from being spammed somehow
|
|
|
|
session['instance'] = request.args.get('instance', default='mastodon.social', type=str)
|
|
if not session['instance'].startswith("https://"):
|
|
session['instance'] = "https://{}".format(session['instance'])
|
|
|
|
session['client_id'], session['client_secret'] = Mastodon.create_app(cfg['name'],
|
|
api_base_url=session['instance'],
|
|
scopes=scopes,
|
|
website=cfg['website'],
|
|
redirect_uris=['https://cg.lynnesbian.space/internal/auth_b', 'http://localhost:5000/internal/auth_b']
|
|
)
|
|
|
|
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance'])
|
|
url = client.auth_request_url(client_id=session['client_id'], redirect_uris='http://localhost:5000/internal/auth_b', scopes=scopes)
|
|
|
|
return redirect(url, code=307)
|
|
|
|
@app.route('/internal/auth_b')
|
|
def internal_auth_b():
|
|
#write details to DB
|
|
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance'])
|
|
session['secret'] = client.log_in(code = request.args.get('code'), scopes=scopes, redirect_uri='http://localhost:5000/internal/auth_b')
|
|
acct_info = client.account_verify_credentials()
|
|
session['username'] = acct_info['username']
|
|
session['avi'] = acct_info['avatar']
|
|
session['acct'] = "@{}@{}".format(session['username'], session['instance'].replace("https://", ""))
|
|
c.execute("SELECT COUNT(*) FROM data WHERE username LIKE %s AND instance LIKE %s", (session['username'], session['instance']))
|
|
if c.fetchone()[0] > 0:
|
|
#user already has an account with CG
|
|
#update the user's info to use the new info we just got, then redirect them to the login page
|
|
c.execute("UPDATE data SET client_id = ?, client_secret = ?, secret = ?, avi = ? WHERE username LIKE %s AND instance LIKE %s", (session['client_id'], session['client_secret'], session['secret'], session['avi'], session['username'], session['instance']))
|
|
return redirect(url_for('log_in'))
|
|
else:
|
|
return redirect(url_for('create_password'))
|
|
|
|
@app.route('/internal/do_login', methods = ['POST'])
|
|
def do_login():
|
|
pw_in = request.form['pw']
|
|
pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest()
|
|
acct = request.form['acct']
|
|
session['username'] = re.match("^@([^@]+)@", acct).group(1)
|
|
session['instance'] = "https://{}".format(re.search("@([^@]+)$", acct).group(1))
|
|
dc.execute("SELECT * FROM data WHERE username LIKE %s AND instance LIKE %s", (session['username'], session['instance']))
|
|
data = dc.fetchone()
|
|
if bcrypt.checkpw(pw_hashed, data['password'].encode('utf-8')):
|
|
#password is correct, log the user in
|
|
for item in ['username', 'instance', 'avi', 'secret', 'client_id', 'client_secret', 'cc', 'ccavi']:
|
|
session[item] = data[item]
|
|
session['acct'] = "@{}@{}".format(session['username'], re.match("https://(.*)", session['instance']).group(1))
|
|
return redirect('/home')
|
|
else:
|
|
return redirect('/login?invalid')
|
|
|
|
@app.route('/create_password')
|
|
def create_password():
|
|
c.execute("SELECT COUNT(*) FROM data WHERE username LIKE %s AND instance LIKE %s", (session['username'], session['instance']))
|
|
if c.fetchone()[0] == 0:
|
|
return render_template("create_password.html", bg = "background-image:url('{}')".format(session['avi']))
|
|
else:
|
|
#user already exists in database, so they already have a password
|
|
return redirect(url_for('main'))
|
|
|
|
@app.route('/internal/create_account', methods=['POST'])
|
|
def create_account():
|
|
pw_in = request.form['pw']
|
|
if len(pw_in) < 6 or pw_in == 'password': #TODO: this is a pretty crappy check
|
|
return redirect('/create_password?invalid')
|
|
pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest()
|
|
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(15))
|
|
c.execute("INSERT INTO data (username, instance, avi, password, secret, client_id, client_secret) VALUES (%s, %s, %s, %s, %s, %s, %s)", (session['username'], session['instance'], session['avi'], pw, session['secret'], session['client_id'], session['client_secret']))
|
|
db.commit()
|
|
return redirect(url_for('home'))
|
|
|
|
#cc connection
|
|
|
|
@app.route('/cc_connect')
|
|
def cc_connect():
|
|
return render_template('cc_connect.html')
|
|
|
|
@app.route('/internal/ccc_a', methods=['POST'])
|
|
def ccc_a(): #step one of curiouscat connection: retreive details
|
|
r = requests.get("https://curiouscat.me/api/v2/profile?username={}".format(request.form['cc']))
|
|
j = r.json()
|
|
if 'error' in j:
|
|
return redirect('/cc_connect?invalid')
|
|
session['cctemp'] = {
|
|
"cc":j['userData']['username'],
|
|
"ccavi":j['userData']['avatar'],
|
|
"ccid":j['userData']['id']
|
|
}
|
|
return redirect('/cc_connect/confirm')
|
|
|
|
@app.route('/cc_connect/confirm')
|
|
def cc_connect_confirm():
|
|
return render_template('cc_connect_confirm.html', bg="background-image:url('{}')".format(session['cctemp']['ccavi']))
|
|
|
|
@app.route('/internal/ccc_b')
|
|
def ccc_b():
|
|
session['cctemp']['challenge'] = random.randint(100000, 999999)
|
|
session.modified = True
|
|
form_data = {
|
|
"addressees": session['cctemp']['ccid'],
|
|
"anon": "true",
|
|
"question": "Hi {}! Your Curious Greg authentication code is: {}. You may safely delete this question after entering the code. If you didn't request this, you can ignore this question.".format(session['acct'], session['cctemp']['challenge'])
|
|
}
|
|
r = requests.post("https://curiouscat.me/api/v2/post/create", data=form_data)
|
|
j = r.json()
|
|
if 'success' in j and j['success'] == True:
|
|
return redirect('/cc_connect/code')
|
|
else:
|
|
#todo: handle error properly
|
|
return False
|
|
|
|
@app.route('/cc_connect/code')
|
|
def cc_connect_code():
|
|
return render_template('cc_connect_code.html')
|