#!/usr/bin/env python3 #Curious Greg - Curious Cat to Mastodon crossposter # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import requests, json, hashlib, urllib, time, re from mastodon import Mastodon from flask import Flask, render_template, request, session, redirect, url_for import mysql.connector import bcrypt cfg = json.load(open("meta.json")) scopes = ["read:accounts", "write:statuses"] settings = { "cw": False, # "disabled": False, } db = mysql.connector.connect(user=cfg['dbuser'], password=cfg['dbpass'], database=cfg['dbname']) c = db.cursor() dc = db.cursor(dictionary=True) # MariaDB [curiousgreg]> DESCRIBE data; # +---------------------+--------------+------+-----+-------------------------------------------+-----------------------------+ # | Field | Type | Null | Key | Default | Extra | # +---------------------+--------------+------+-----+-------------------------------------------+-----------------------------+ # | username | varchar(64) | NO | PRI | NULL | | # | instance | varchar(128) | NO | PRI | NULL | | # | password | tinytext | NO | | NULL | | # | avi | text | NO | | NULL | | # | secret | tinytext | NO | | NULL | | # | client_id | varchar(128) | NO | | NULL | | # | client_secret | tinytext | NO | | NULL | | # | cc | tinytext | YES | | NULL | | # | ccavi | varchar(128) | YES | | https://lynnesbian.space/res/ceres/cc.png | | # | latest_post | tinytext | YES | | NULL | | # | latest_timestamp | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | # | time_between_checks | int(11) | YES | | NULL | | # | settings | longtext | YES | | NULL | | # +---------------------+--------------+------+-----+-------------------------------------------+-----------------------------+ c.execute("CREATE TABLE IF NOT EXISTS `data` (username VARCHAR(64) NOT NULL, instance VARCHAR(128) NOT NULL, password TINYTEXT NOT NULL, avi TEXT NOT NULL, secret TINYTEXT NOT NULL, client_id VARCHAR(128) NOT NULL, client_secret TINYTEXT NOT NULL, cc TINYTEXT, ccavi VARCHAR(128) DEFAULT 'https://lynnesbian.space/res/ceres/cc.png', latest_post TINYTEXT, latest_timestamp TIMESTAMP, time_between_checks INT, settings LONGTEXT, PRIMARY KEY(username, instance))") app = Flask(cfg['name']) app.secret_key = cfg['flask_key'] @app.route('/') def main(): if 'acct' not in session: return render_template("landing_page.html") else: return redirect(url_for('home')) @app.route('/home') def home(): if 'acct' in session: if 'cc' not in session: session['cc'] = "None" if session['cc'] == "None": #every time home is rendered without cc being set c.execute("SELECT cc FROM `data` WHERE client_id LIKE %s AND instance LIKE %s", (session['client_id'], session['instance'])) cc = c.fetchone()[0] if cc != '': session['cc'] = cc if 'last_avi_update' not in session or session['last_avi_update'] + (24 * 60 * 60) < time.time(): #avatars haven't been updated for over 24 hours, update them now client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], access_token=session['secret'], api_base_url=session['instance']) session['avi'] = client.account_verify_credentials()['avatar'] if session['cc'] != None: #update cc avi too r = requests.get("https://curiouscat.me/api/v2/profile?username={}".format(session['cc'])) j = r.json() session['ccavi'] = j['userData']['avatar'] c.execute("UPDATE data SET avi = %s, ccavi = %s WHERE client_id LIKE %s AND instance LIKE %s", (session['avi'], session['ccavi'], session['client_id'], session['instance'])) else: c.execute("UPDATE data SET avi = %s WHERE client_id LIKE %s AND instance LIKE %s", (session['avi'], session['client_id'], session['instance'])) return render_template("home.html") else: return redirect(url_for('main')) @app.route('/debug') #TODO: remove this before making the site live ;p def print_debug_info(): return json.dumps(session._get_current_object()) @app.route('/reset') #TODO: ditto def reset_session(): session.clear() return redirect(url_for('main')) @app.route('/login') def log_in(): if 'acct' in session: #user is probably already logged in. if they aren't, home() will handle things and redirect them back here return redirect(url_for('home')) return render_template("login.html") # return(json.dumps(client_info)) #internal stuff @app.route('/internal/auth_a') def internal_auth_a(): #TODO: prevent these endpoints from being spammed somehow session['instance'] = request.args.get('instance', default='mastodon.social', type=str) if not session['instance'].startswith("https://"): session['instance'] = "https://{}".format(session['instance']) session['client_id'], session['client_secret'] = Mastodon.create_app(cfg['name'], api_base_url=session['instance'], scopes=scopes, website=cfg['website'], redirect_uris=['https://cg.lynnesbian.space/internal/auth_b', 'http://localhost:5000/internal/auth_b'] ) client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance']) url = client.auth_request_url(client_id=session['client_id'], redirect_uris='http://localhost:5000/internal/auth_b', scopes=scopes) return redirect(url, code=307) @app.route('/internal/auth_b') def internal_auth_b(): #write details to DB client = Mastodon(client_id=session['client_id'], client_secret=session['client_secret'], api_base_url=session['instance']) session['secret'] = client.log_in(code = request.args.get('code'), scopes=scopes, redirect_uri='http://localhost:5000/internal/auth_b') acct_info = client.account_verify_credentials() session['username'] = acct_info['username'] session['avi'] = acct_info['avatar'] session['acct'] = "@{}@{}".format(session['username'], session['instance'].replace("https://", "")) c.execute("SELECT COUNT(*) FROM data WHERE username LIKE %s AND instance LIKE %s", (session['username'], session['instance'])) if c.fetchone()[0] > 0: #user already has an account with CG #update the user's info to use the new info we just got, then redirect them to the login page c.execute("UPDATE data SET client_id = ?, client_secret = ?, secret = ?, avi = ? WHERE username LIKE %s AND instance LIKE %s", (session['client_id'], session['client_secret'], session['secret'], session['avi'], session['username'], session['instance'])) return redirect(url_for('log_in')) else: return redirect(url_for('create_password')) @app.route('/internal/do_login') def do_login(): pw_in = request.form['pw'] pw_hashed = hashlib.sha256(pw_in.encode('utf-8')) acct = request.form['acct'] session['username'] = re.match("^@[^@]*", acct).group(0) session['instance'] = "https://{}".format(re.search("@([^@]+)$", acct).group(1)) dc.execute("SELECT * FROM data WHERE username LIKE %s AND password LIKE %s", (session['username'], session['instance'])) data = dc.fetchone() if bcrypt.checkpw(pw_hashed, data['password']): #password is correct, log the user in for item in ['username', 'instance', 'avi', 'secret', 'client_id', 'client_secret', 'cc', 'ccavi']: session[item] = data[item] return redirect('/home') else: return redirect('/login?invalid') @app.route('/create_password') def create_password(): c.execute("SELECT COUNT(*) FROM data WHERE username LIKE %s AND instance LIKE %s", (session['username'], session['instance'])) if c.fetchone()[0] == 0: return render_template("create_password.html", bg = "background-image:url('{}')".format(session['avi'])) else: #user already exists in database, so they already have a password return redirect(url_for('main')) @app.route('/internal/create_account', methods=['POST']) def create_account(): pw_in = request.form['pw'] if len(pw_in) < 6 or pw_in == 'password': #TODO: this is a pretty crappy check return redirect('/create_password?invalid') pw_hashed = hashlib.sha256(pw_in.encode('utf-8')).digest() pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(15)) c.execute("INSERT INTO data (username, instance, avi, password, secret, client_id, client_secret) VALUES (%s, %s, %s, %s, %s, %s, %s)", (session['username'], session['instance'], session['avi'], pw, session['secret'], session['client_id'], session['client_secret'])) db.commit() return redirect(url_for('home'))