Curious Cat to Mastodon crossposter
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

285 lines
13 KiB

#!/usr/bin/env python3
#Curious Greg - Curious Cat to Mastodon crossposter
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#TODO: ADD RETROSPRING SUPPORT!
import json, hashlib, urllib, time, re, random
import requests
from mastodon import Mastodon
from flask import Flask, render_template, request, session, redirect, url_for
import mysql.connector
import bcrypt
cfg = json.load(open("meta.json"))
scopes = ["read:accounts", "write:statuses"]
settings = {
"cw": True,
# "disabled": False,
}
# +---------------------+---------------+------+-----+------------------------------------------------+-------+
# | Field | Type | Null | Key | Default | Extra |
# +---------------------+---------------+------+-----+------------------------------------------------+-------+
# | username | varchar(64) | NO | PRI | NULL | |
# | instance | varchar(128) | NO | PRI | NULL | |
# | password | tinytext | NO | | NULL | |
# | avi | text | NO | | NULL | |
# | secret | tinytext | NO | | NULL | |
# | client_id | varchar(128) | NO | | NULL | |
# | client_secret | tinytext | NO | | NULL | |
# | cc | tinytext | YES | | NULL | |
# | ccavi | varchar(128) | YES | | https://lynnesbian.space/res/ceres/cc-smol.png | |
# | latest_post | tinytext | YES | | NULL | |
# | last_check | int(11) | NO | | 0 | |
# | time_between_checks | int(11) | NO | | 1 | |
# | settings | varchar(4096) | YES | | {"cw": true} | |
# +---------------------+---------------+------+-----+------------------------------------------------+-------+
def db_reconnect():
db = mysql.connector.connect(user=cfg['dbuser'], password=cfg['dbpass'], database=cfg['dbname'])
c = db.cursor()
dc = db.cursor(dictionary=True)
return (db, c, dc)
gdb, gc, gdc = db_reconnect()
gc.execute("CREATE TABLE IF NOT EXISTS `data` (username VARCHAR(64) NOT NULL, instance VARCHAR(128) NOT NULL, password TINYTEXT NOT NULL, avi TEXT NOT NULL, secret TINYTEXT NOT NULL, client_id VARCHAR(128) NOT NULL, client_secret TINYTEXT NOT NULL, cc TINYTEXT, ccavi VARCHAR(128) DEFAULT 'https://lynnesbian.space/res/ceres/cc-smol.png', latest_post TINYTEXT, last_check INT DEFAULT 0 NOT NULL, time_between_checks INT DEFAULT %s NOT NULL, settings VARCHAR(4096) DEFAULT %s, PRIMARY KEY(username, instance))", (cfg['min_time_between_checks'], json.dumps(settings),))
gdb.close()
app = Flask(cfg['name'])
app.secret_key = cfg['flask_key']
if cfg['debug']:
app.config['DEBUG'] = True
@app.route('/')
def main():
if 'acct' not in session:
return render_template("landing_page.html")
else:
return redirect(url_for('home'))
@app.route('/home')
def home():
if 'acct' in session:
db, c, dc = db_reconnect()
dc.execute("SELECT * FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance']))
data = dc.fetchone()
try:
for item in ['username', 'instance', 'avi', 'secret', 'client_id', 'client_secret', 'cc', 'ccavi']:
session[item] = data[item]
except:
db.close()
return redirect('/logout') #TODO: not good UX
if 'cc' not in session:
session['cc'] = "None"
if session['cc'] == "None" or 'ccavi' not in session:
#every time home is rendered without cc being set
c.execute("SELECT cc, ccavi FROM `data` WHERE client_id = %s AND instance = %s", (session['client_id'], session['instance']))
cc = c.fetchone()
if cc[0] != '':
session['cc'] = cc[0]
session['ccavi'] = cc[1]
if 'last_avi_update' not in session or session['last_avi_update'] + (24 * 60 * 60) < time.time():
#avatars haven't been updated for over 24 hours, update them now
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], access_token=session['secret'], api_base_url=session['instance'])
session['avi'] = client.account_verify_credentials()['avatar']
if session['cc'] != "None" and session['cc'] != None:
#update cc avi too
r = requests.get("https://curiouscat.me/api/v2/profile?username={}".format(session['cc']))
j = r.json()
try:
session['ccavi'] = j['userData']['avatar']
except:
return json.dumps(j)
c.execute("UPDATE data SET avi = %s, ccavi = %s WHERE client_id = %s AND instance = %s", (session['avi'], session['ccavi'], session['client_id'], session['instance']))
else:
c.execute("UPDATE data SET avi = %s WHERE client_id = %s AND instance = %s", (session['avi'], session['client_id'], session['instance']))
session['last_avi_update'] = int(time.time())
db.commit()
db.close()
return render_template("home.html", mabg="background-image:url('{}')".format(session['avi']), ccbg="background-image:url('{}')".format(session['ccavi']))
else:
db.close()
return redirect(url_for('main'))
@app.route('/debug')
def print_debug_info():
if cfg['debug']:
return json.dumps(session._get_current_object())
else:
return redirect('/home')
@app.route('/logout')
def reset_session():
session.clear()
return redirect(url_for('main'))
@app.route('/login')
def log_in():
if 'acct' in session:
#user is probably already logged in. if they aren't, home() will handle things and redirect them back here
return redirect(url_for('home'))
return render_template("login.html")
# return(json.dumps(client_info))
#internal stuff
@app.route('/internal/auth_a')
def internal_auth_a(): #TODO: prevent these endpoints from being spammed somehow
session['instance'] = request.args.get('instance', default='mastodon.social', type=str)
if not session['instance'].startswith("https://"):
session['instance'] = "https://{}".format(session['instance'])
session['client_id'], session['client_secret'] = Mastodon.create_app(cfg['name'],
api_base_url=session['instance'],
scopes=scopes,
website=cfg['website'],
redirect_uris=['{}/internal/auth_b'.format(cfg['base_uri'])]
)
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance'])
url = client.auth_request_url(client_id=session['client_id'], redirect_uris='{}/internal/auth_b'.format(cfg['base_uri']), scopes=scopes)
return redirect(url, code=307)
@app.route('/internal/auth_b')
def internal_auth_b():
#write details to DB
db, c, dc = db_reconnect()
client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance'])
session['secret'] = client.log_in(code = request.args.get('code'), scopes=scopes, redirect_uri='{}/internal/auth_b'.format(cfg['base_uri']))
acct_info = client.account_verify_credentials()
session['username'] = acct_info['username']
session['avi'] = acct_info['avatar']
session['acct'] = "@{}@{}".format(session['username'], session['instance'].replace("https://", ""))
c.execute("SELECT COUNT(*) FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance']))
if c.fetchone()[0] > 0:
#user already has an account with CG
#update the user's info to use the new info we just got, then redirect them to the login page
c.execute("UPDATE data SET client_id = %s, client_secret = %s, secret = %s, avi = %s WHERE username = %s AND instance = %s", (session['client_id'], session['client_secret'], session['secret'], session['avi'], session['username'], session['instance']))
db.commit()
db.close()
return redirect(url_for('log_in'))
else:
db.close()
return redirect(url_for('create_password'))
@app.route('/internal/do_login', methods = ['POST'])
def do_login():
db, c, dc = db_reconnect()
pw_in = request.form['pw']
pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest()
acct = request.form['acct']
session['username'] = re.match("^@([^@]+)@", acct).group(1)
session['instance'] = "https://{}".format(re.search("@([^@]+)$", acct).group(1)) #todo: this occasionally gets "https://@username@instan.ce"
dc.execute("SELECT * FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance']))
data = dc.fetchone()
db.close()
if bcrypt.checkpw(pw_hashed, data['password'].encode('utf-8')):
#password is correct, log the user in
for item in ['username', 'instance', 'avi', 'secret', 'client_id', 'client_secret', 'cc', 'ccavi']:
session[item] = data[item]
session['acct'] = "@{}@{}".format(session['username'], re.match("https://(.*)", session['instance']).group(1))
return redirect('/home')
else:
return redirect('/login?invalid')
@app.route('/create_password')
def create_password():
db, c, dc = db_reconnect()
c.execute("SELECT COUNT(*) FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance']))
if c.fetchone()[0] == 0:
db.close()
return render_template("create_password.html", bg = "background-image:url('{}')".format(session['avi']))
else:
#user already exists in database, so they already have a password
db.close()
return redirect(url_for('main'))
@app.route('/internal/create_account', methods=['POST'])
def create_account():
db, c, dc = db_reconnect()
pw_in = request.form['pw']
if len(pw_in) < 6 or pw_in == 'password': #TODO: this is a pretty crappy check
db.close()
return redirect('/create_password?invalid')
pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest()
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(15))
c.execute("INSERT INTO data (username, instance, avi, password, secret, client_id, client_secret) VALUES (%s, %s, %s, %s, %s, %s, %s)", (session['username'], session['instance'], session['avi'], pw, session['secret'], session['client_id'], session['client_secret']))
db.commit()
db.close()
return redirect(url_for('home'))
#cc connection
@app.route('/cc_connect')
def cc_connect():
return render_template('cc_connect.html')
@app.route('/internal/ccc_a', methods=['POST'])
def ccc_a(): #step one of curiouscat connection: retreive details
r = requests.get("https://curiouscat.me/api/v2/profile?username={}&count=1".format(request.form['cc']))
j = r.json()
if 'error' in j:
return redirect('/cc_connect?invalid')
session['cctemp'] = {
"cc":j['userData']['username'],
"ccavi":j['userData']['avatar'],
"ccid":j['userData']['id'],
"latest_post":j['posts'][0]['timestamp'] if len(j['posts']) != 0 else 0 #only post new answers from this point onwards, rather than posting all the old ones
}
return redirect('/cc_connect/confirm')
@app.route('/cc_connect/confirm')
def cc_connect_confirm():
return render_template('cc_connect_confirm.html', bg="background-image:url('{}')".format(session['cctemp']['ccavi']))
@app.route('/internal/ccc_b') #TODO: don't allow people to spam this
def ccc_b():
session['cctemp']['challenge'] = random.randint(100000, 999999)
session.modified = True
form_data = {
"addressees": session['cctemp']['ccid'],
"anon": "true",
"question": "Hi {}! Your Curious Greg authentication code is: {}. You may safely delete this question after entering the code. If you didn't request this, you can ignore this question.".format(session['acct'], session['cctemp']['challenge'])
}
r = requests.post("https://curiouscat.me/api/v2/post/create", data=form_data)
j = r.json()
if 'success' in j and j['success'] == True:
return redirect('/cc_connect/code')
else:
#todo: handle error properly
return False
@app.route('/cc_connect/code')
def cc_connect_code():
return render_template('cc_connect_code.html')
@app.route('/internal/ccc_c', methods=['POST'])
def ccc_c():
if int(request.form['challenge']) != session['cctemp']['challenge']:
return redirect('/cc_connect/code?invalid')
for item in ['cc', 'ccavi']:
session[item] = session['cctemp'][item]
db, c, dc = db_reconnect()
c.execute("UPDATE data SET cc = %s, ccavi = %s, latest_post = %s WHERE username = %s AND instance = %s", (session['cc'], session['ccavi'], session['cctemp']['latest_post'], session['username'], session['instance']))
db.commit()
db.close()
del session['cctemp']
return redirect('/cc_connect/complete')
@app.route('/cc_connect/complete')
def cc_connect_complete():
return render_template('cc_connect_complete.html', bg="background-image:url('{}')".format(session['ccavi']))
@app.route('/settings')
def settings_page():
return render_template('settings.html')