#!/usr/bin/env python3 #Curious Greg - Curious Cat to Mastodon crossposter # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. #TODO: ADD RETROSPRING SUPPORT! import json, hashlib, urllib, time, re, random import requests from mastodon import Mastodon from flask import Flask, render_template, request, session, redirect, url_for import mysql.connector import bcrypt cfg = json.load(open("meta.json")) scopes = ["read:accounts", "write:statuses"] settings = { "cw": True, # "disabled": False, } # +---------------------+---------------+------+-----+------------------------------------------------+-------+ # | Field | Type | Null | Key | Default | Extra | # +---------------------+---------------+------+-----+------------------------------------------------+-------+ # | username | varchar(64) | NO | PRI | NULL | | # | instance | varchar(128) | NO | PRI | NULL | | # | password | tinytext | NO | | NULL | | # | avi | text | NO | | NULL | | # | secret | tinytext | NO | | NULL | | # | client_id | varchar(128) | NO | | NULL | | # | client_secret | tinytext | NO | | NULL | | # | cc | tinytext | YES | | NULL | | # | ccavi | varchar(128) | YES | | https://lynnesbian.space/res/ceres/cc-smol.png | | # | latest_post | tinytext | YES | | NULL | | # | last_check | int(11) | NO | | 0 | | # | time_between_checks | int(11) | NO | | 1 | | # | settings | varchar(4096) | YES | | {"cw": true} | | # +---------------------+---------------+------+-----+------------------------------------------------+-------+ def db_reconnect(): db = mysql.connector.connect(user=cfg['dbuser'], password=cfg['dbpass'], database=cfg['dbname']) c = db.cursor() dc = db.cursor(dictionary=True) return (db, c, dc) gdb, gc, gdc = db_reconnect() gc.execute("CREATE TABLE IF NOT EXISTS `data` (username VARCHAR(64) NOT NULL, instance VARCHAR(128) NOT NULL, password TINYTEXT NOT NULL, avi TEXT NOT NULL, secret TINYTEXT NOT NULL, client_id VARCHAR(128) NOT NULL, client_secret TINYTEXT NOT NULL, cc TINYTEXT, ccavi VARCHAR(128) DEFAULT 'https://lynnesbian.space/res/ceres/cc-smol.png', latest_post TINYTEXT, last_check INT DEFAULT 0 NOT NULL, time_between_checks INT DEFAULT %s NOT NULL, settings VARCHAR(4096) DEFAULT %s, PRIMARY KEY(username, instance))", (cfg['min_time_between_checks'], json.dumps(settings),)) gdb.close() app = Flask(cfg['name']) app.secret_key = cfg['flask_key'] if cfg['debug']: app.config['DEBUG'] = True @app.route('/') def main(): if 'acct' not in session: return render_template("landing_page.html") else: return redirect(url_for('home')) @app.route('/home') def home(): if 'acct' in session: db, c, dc = db_reconnect() dc.execute("SELECT * FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance'])) data = dc.fetchone() try: for item in ['username', 'instance', 'avi', 'secret', 'client_id', 'client_secret', 'cc', 'ccavi']: session[item] = data[item] except: db.close() return redirect('/logout') #TODO: not good UX if 'cc' not in session: session['cc'] = "None" if session['cc'] == "None" or 'ccavi' not in session: #every time home is rendered without cc being set c.execute("SELECT cc, ccavi FROM `data` WHERE client_id = %s AND instance = %s", (session['client_id'], session['instance'])) cc = c.fetchone() if cc[0] != '': session['cc'] = cc[0] session['ccavi'] = cc[1] if 'last_avi_update' not in session or session['last_avi_update'] + (24 * 60 * 60) < time.time(): #avatars haven't been updated for over 24 hours, update them now client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], access_token=session['secret'], api_base_url=session['instance']) session['avi'] = client.account_verify_credentials()['avatar'] if session['cc'] != "None" and session['cc'] != None: #update cc avi too r = requests.get("https://curiouscat.me/api/v2/profile?username={}".format(session['cc'])) j = r.json() try: session['ccavi'] = j['userData']['avatar'] except: return json.dumps(j) c.execute("UPDATE data SET avi = %s, ccavi = %s WHERE client_id = %s AND instance = %s", (session['avi'], session['ccavi'], session['client_id'], session['instance'])) else: c.execute("UPDATE data SET avi = %s WHERE client_id = %s AND instance = %s", (session['avi'], session['client_id'], session['instance'])) session['last_avi_update'] = int(time.time()) db.commit() db.close() return render_template("home.html", mabg="background-image:url('{}')".format(session['avi']), ccbg="background-image:url('{}')".format(session['ccavi'])) else: db.close() return redirect(url_for('main')) @app.route('/debug') def print_debug_info(): if cfg['debug']: return json.dumps(session._get_current_object()) else: return redirect('/home') @app.route('/logout') def reset_session(): session.clear() return redirect(url_for('main')) @app.route('/login') def log_in(): if 'acct' in session: #user is probably already logged in. if they aren't, home() will handle things and redirect them back here return redirect(url_for('home')) return render_template("login.html") # return(json.dumps(client_info)) #internal stuff @app.route('/internal/auth_a') def internal_auth_a(): #TODO: prevent these endpoints from being spammed somehow session['instance'] = request.args.get('instance', default='mastodon.social', type=str) if not session['instance'].startswith("https://"): session['instance'] = "https://{}".format(session['instance']) session['client_id'], session['client_secret'] = Mastodon.create_app(cfg['name'], api_base_url=session['instance'], scopes=scopes, website=cfg['website'], redirect_uris=['{}/internal/auth_b'.format(cfg['base_uri'])] ) client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance']) url = client.auth_request_url(client_id=session['client_id'], redirect_uris='{}/internal/auth_b'.format(cfg['base_uri']), scopes=scopes) return redirect(url, code=307) @app.route('/internal/auth_b') def internal_auth_b(): #write details to DB db, c, dc = db_reconnect() client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance']) session['secret'] = client.log_in(code = request.args.get('code'), scopes=scopes, redirect_uri='{}/internal/auth_b'.format(cfg['base_uri'])) acct_info = client.account_verify_credentials() session['username'] = acct_info['username'] session['avi'] = acct_info['avatar'] session['acct'] = "@{}@{}".format(session['username'], session['instance'].replace("https://", "")) c.execute("SELECT COUNT(*) FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance'])) if c.fetchone()[0] > 0: #user already has an account with CG #update the user's info to use the new info we just got, then redirect them to the login page c.execute("UPDATE data SET client_id = %s, client_secret = %s, secret = %s, avi = %s WHERE username = %s AND instance = %s", (session['client_id'], session['client_secret'], session['secret'], session['avi'], session['username'], session['instance'])) db.commit() db.close() return redirect(url_for('log_in')) else: db.close() return redirect(url_for('create_password')) @app.route('/internal/do_login', methods = ['POST']) def do_login(): db, c, dc = db_reconnect() pw_in = request.form['pw'] pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest() acct = request.form['acct'] session['username'] = re.match("^@([^@]+)@", acct).group(1) session['instance'] = "https://{}".format(re.search("@([^@]+)$", acct).group(1)) #todo: this occasionally gets "https://@username@instan.ce" dc.execute("SELECT * FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance'])) data = dc.fetchone() db.close() if bcrypt.checkpw(pw_hashed, data['password'].encode('utf-8')): #password is correct, log the user in for item in ['username', 'instance', 'avi', 'secret', 'client_id', 'client_secret', 'cc', 'ccavi']: session[item] = data[item] session['acct'] = "@{}@{}".format(session['username'], re.match("https://(.*)", session['instance']).group(1)) return redirect('/home') else: return redirect('/login?invalid') @app.route('/create_password') def create_password(): db, c, dc = db_reconnect() c.execute("SELECT COUNT(*) FROM data WHERE username = %s AND instance = %s", (session['username'], session['instance'])) if c.fetchone()[0] == 0: db.close() return render_template("create_password.html", bg = "background-image:url('{}')".format(session['avi'])) else: #user already exists in database, so they already have a password db.close() return redirect(url_for('main')) @app.route('/internal/create_account', methods=['POST']) def create_account(): db, c, dc = db_reconnect() pw_in = request.form['pw'] if len(pw_in) < 6 or pw_in == 'password': #TODO: this is a pretty crappy check db.close() return redirect('/create_password?invalid') pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest() pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(15)) c.execute("INSERT INTO data (username, instance, avi, password, secret, client_id, client_secret) VALUES (%s, %s, %s, %s, %s, %s, %s)", (session['username'], session['instance'], session['avi'], pw, session['secret'], session['client_id'], session['client_secret'])) db.commit() db.close() return redirect(url_for('home')) #cc connection @app.route('/cc_connect') def cc_connect(): return render_template('cc_connect.html') @app.route('/internal/ccc_a', methods=['POST']) def ccc_a(): #step one of curiouscat connection: retreive details r = requests.get("https://curiouscat.me/api/v2/profile?username={}&count=1".format(request.form['cc'])) j = r.json() if 'error' in j: return redirect('/cc_connect?invalid') session['cctemp'] = { "cc":j['userData']['username'], "ccavi":j['userData']['avatar'], "ccid":j['userData']['id'], "latest_post":j['posts'][0]['timestamp'] if len(j['posts']) != 0 else 0 #only post new answers from this point onwards, rather than posting all the old ones } return redirect('/cc_connect/confirm') @app.route('/cc_connect/confirm') def cc_connect_confirm(): return render_template('cc_connect_confirm.html', bg="background-image:url('{}')".format(session['cctemp']['ccavi'])) @app.route('/internal/ccc_b') #TODO: don't allow people to spam this def ccc_b(): session['cctemp']['challenge'] = random.randint(100000, 999999) session.modified = True form_data = { "addressees": session['cctemp']['ccid'], "anon": "true", "question": "Hi {}! Your Curious Greg authentication code is: {}. You may safely delete this question after entering the code. If you didn't request this, you can ignore this question.".format(session['acct'], session['cctemp']['challenge']) } r = requests.post("https://curiouscat.me/api/v2/post/create", data=form_data) j = r.json() if 'success' in j and j['success'] == True: return redirect('/cc_connect/code') else: #todo: handle error properly return False @app.route('/cc_connect/code') def cc_connect_code(): return render_template('cc_connect_code.html') @app.route('/internal/ccc_c', methods=['POST']) def ccc_c(): if int(request.form['challenge']) != session['cctemp']['challenge']: return redirect('/cc_connect/code?invalid') for item in ['cc', 'ccavi']: session[item] = session['cctemp'][item] db, c, dc = db_reconnect() c.execute("UPDATE data SET cc = %s, ccavi = %s, latest_post = %s WHERE username = %s AND instance = %s", (session['cc'], session['ccavi'], session['cctemp']['latest_post'], session['username'], session['instance'])) db.commit() db.close() del session['cctemp'] return redirect('/cc_connect/complete') @app.route('/cc_connect/complete') def cc_connect_complete(): return render_template('cc_connect_complete.html', bg="background-image:url('{}')".format(session['ccavi'])) @app.route('/settings') def settings_page(): return render_template('settings.html')