1
0
Fork 0
mirror of https://github.com/Lynnesbian/FediBooks/ synced 2024-11-25 16:48:58 +00:00

Compare commits

..

10 commits

9 changed files with 197 additions and 104 deletions

View file

@ -1,5 +1,14 @@
from bs4 import BeautifulSoup
import html, re
import MySQLdb
import markovify
from mastodon import Mastodon
import html, re, json
cfg = json.load(open('config.json'))
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences
def test_sentence_input(self, sentence):
return True # all sentences are valid <3
def extract_post(post):
post = html.unescape(post) # convert HTML escape codes to text
@ -24,3 +33,95 @@ def extract_post(post):
text = re.sub("https://([^/]+)/users/([^ ]+)", r"@\2@\1", text) # put pleroma-style mentions back in
text = text.rstrip("\n") # remove trailing newline(s)
return text
def make_post(handle):
handle = handle[0]
db = MySQLdb.connect(
host = cfg['db_host'],
user=cfg['db_user'],
passwd=cfg['db_pass'],
db=cfg['db_name']
)
print("Generating post for {}".format(handle))
dc = db.cursor(MySQLdb.cursors.DictCursor)
c = db.cursor()
dc.execute("""
SELECT
learn_from_cw,
fake_mentions,
fake_mentions_full,
post_privacy,
content_warning,
client_id,
client_secret,
secret
FROM
bots, credentials
WHERE
bots.credentials_id = (SELECT
credentials_id
FROM
bots
WHERE
handle = %s)
""", (handle,))
bot = dc.fetchone()
client = Mastodon(
client_id = bot['client_id'],
client_secret = bot['client_secret'],
access_token = bot['secret'],
api_base_url = "https://{}".format(handle.split("@")[2])
)
# by default, only select posts that don't have CWs.
# if learn_from_cw, then also select posts with CWs
cw_list = [False]
if bot['learn_from_cw']:
cw_list = [False, True]
# select 1000 random posts for the bot to learn from
c.execute("SELECT content FROM posts WHERE fedi_id IN (SELECT fedi_id FROM bot_learned_accounts WHERE bot_id = %s) AND cw IN %s ORDER BY RAND() LIMIT 1000", (handle, cw_list))
# this line is a little gross/optimised but here's what it does
# 1. fetch all of the results from the above query
# 2. turn (('this',), ('format')) into ('this', 'format')
# 3. convert the tuple to a list
# 4. join the list into a string separated by newlines
posts = "\n".join(list(sum(c.fetchall(), ())))
model = nlt_fixed(posts)
tries = 0
post = None
# even with such a high tries value for markovify, it still sometimes returns none.
# so we implement our own tries function as well, and try ten times.
if bot['fake_mentions'] == 'never':
# remove all mentions from the training data before the markov model sees it
posts = re.sub(r"@(\w+)@([\w.]+)\s?", "", posts)
while post is None and tries < 10:
post = model.make_short_sentence(500, tries = 10000)
tries += 1
if post == None:
# TODO: send an error email
pass
else:
if "@" in post and bot['fake_mentions'] != 'never':
# the unicode zero width space is a (usually) invisible character
# we can insert it between the @ symbols in a handle to make it appear fine while not mentioning the user
zws = "\u200B"
if bot['fake_mentions'] == 'middle':
# remove mentions at the start of a post
post = re.sub(r"^(@\w+@[\w.]+\s*)+", "", post)
# TODO: does this regex catch all valid handles?
if bot['fake_mentions_full']:
post = re.sub(r"@(\w+)@([\w.]+)", r"@{}\1@{}\2".format(zws, zws), post)
else:
post = re.sub(r"@(\w+)@([\w.]+)", r"@{}\1".format(zws), post)
print(post)
client.status_post(post, visibility = bot['post_privacy'], spoiler_text = bot['content_warning'])
# TODO: update date of last post

View file

@ -9,10 +9,6 @@ import functions
cfg = json.load(open('config.json'))
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences
def test_sentence_input(self, sentence):
return True # all sentences are valid <3
def scrape_posts(account):
handle = account[0]
outbox = account[1]
@ -88,67 +84,6 @@ def scrape_posts(account):
db.commit()
c.close()
def make_post(handle):
handle = handle[0]
print("Generating post for {}".format(handle))
c = db.cursor()
c.execute("""
SELECT
learn_from_cw, client_id, client_secret, secret
FROM
bots, credentials
WHERE
bots.credentials_id = (SELECT
credentials_id
FROM
bots
WHERE
handle = %s)
""", (handle,))
bot = c.fetchone()
client = Mastodon(
client_id = bot[1],
client_secret = bot[2],
access_token = bot[3],
api_base_url = "https://{}".format(handle.split("@")[2])
)
# by default, only select posts that don't have CWs.
# if learn_from_cw, then also select posts with CWs
cw_list = [False]
if bot[0]:
cw_list = [False, True]
# select 1000 random posts for the bot to learn from
c.execute("SELECT content FROM posts WHERE fedi_id IN (SELECT fedi_id FROM bot_learned_accounts WHERE bot_id = %s) AND cw IN %s ORDER BY RAND() LIMIT 1000", (handle, cw_list))
# this line is a little gross/optimised but here's what it does
# 1. fetch all of the results from the above query
# 2. turn (('this',), ('format')) into ('this', 'format')
# 3. convert the tuple to a list
# 4. join the list into a string separated by newlines
posts = "\n".join(list(sum(c.fetchall(), ())))
model = nlt_fixed(posts)
tries = 0
sentence = None
# even with such a high tries value for markovify, it still sometimes returns none.
# so we implement our own tries function as well, and try ten times.
while sentence is None and tries < 10:
sentence = model.make_short_sentence(500, tries = 10000)
tries += 1
# TODO: mention handling
if sentence == None:
# TODO: send an error email
pass
else:
client.status_post(sentence)
# TODO: update date of last post
print("Establishing DB connection")
db = MySQLdb.connect(
host = cfg['db_host'],
@ -173,6 +108,6 @@ cursor.execute("SELECT handle FROM bots WHERE enabled = TRUE")
bots = cursor.fetchall()
with Pool(8) as p:
p.map(make_post, bots)
p.map(functions.make_post, bots)
#TODO: other cron tasks should be done here, like updating profile pictures

View file

@ -26,9 +26,9 @@ CREATE TABLE IF NOT EXISTS `bots` (
`post_frequency` SMALLINT UNSIGNED DEFAULT 30,
`content_warning` VARCHAR(128),
`length` SMALLINT UNSIGNED DEFAULT 500,
`fake_mentions` ENUM('always', 'start', 'never') DEFAULT 'start',
`fake_mentions` ENUM('always', 'middle', 'never') DEFAULT 'middle',
`fake_mentions_full` BOOLEAN DEFAULT 0,
`post_privacy` ENUM('public', 'unlisted', 'followers_only') DEFAULT 'unlisted',
`post_privacy` ENUM('public', 'unlisted', 'private') DEFAULT 'unlisted',
`learn_from_cw` BOOLEAN DEFAULT 0,
`last_post` DATETIME DEFAULT 0,
`icon` VARCHAR(512),

View file

@ -183,10 +183,18 @@ form .row {
display: inline-block;
}
.error {
background-color: #e66;
.error, .success {
color: white;
text-align: center;
font-size: 1.6em;
padding: 10px;
}
.error {
background-color: #e66;
}
.error.err-small {
font-size: 1.0em;
}
.success {
background-color: #6e6;
}

View file

@ -30,7 +30,7 @@
{% elif session['step'] == 4 %}
<h2 class="thin centred">Congratulations!</h2>
<p>FediBooks has successfully authenticated with {{ session['instance'] }}, and your bot is ready to be configured. Click finish to return to the bot management screen.</p>
<p>FediBooks has successfully authenticated with your instance, and your bot is ready to be configured. Click finish to return to the bot management screen.</p>
<p>To get your bot working, you need to add at least one account for it to learn from. You can do so by clicking the <i class="fas fa-users"></i> button. To configure settings such as posting frequency and content warnings, click the <i class="fas fa-cog"></i> button.</p>
{% else %}

View file

@ -33,8 +33,8 @@
<div class="row">
<label for="fake-mentions" class="large">Fake mentions</label>
<select name="fake-mentions">
<option value="full">At any time</option>
<option value="start" default>Only at the start of posts</option>
<option value="always">At any time</option>
<option value="middle" default>Only in the middle of posts</option>
<option value="never">Never</option>
</select>
</div>

View file

@ -11,8 +11,11 @@
<h1 class="thin centred">Account settings</h1>
</div>
{% include 'error.html' %}
{% include 'success.html' %}
<div class="container">
<form action="/do/bot/edit" method="post" class="full-width">
<form method="POST" class="full-width">
<div class="container light">
<h2 class="thin centred">Login settings</h2>
<p class="centred">Update your email and password here.</p>
@ -32,6 +35,10 @@
<p class="centred">When should FediBooks send you email?</p>
</div>
<div class="error err-small">
Note: This feature isn't ready yet. As of now, FediBooks will not send you email.
</div>
<div class="row">
<label for="fetch-error" class="large">When my bot(s) can't get new posts</label>
<select name="fetch-error">

5
templates/success.html Normal file
View file

@ -0,0 +1,5 @@
{% if success != None %}
<div class="success">
<i class="fas fa-check-circle"></i> Information updated succesfully.
</div>
{% endif %}

View file

@ -20,6 +20,13 @@ mysql = MySQL(app)
scopes = ['write:statuses', 'write:accounts', 'read:accounts', 'read:notifications', 'read:statuses', 'push']
@app.before_request
def login_check():
if request.path not in ['/', '/about', '/welcome', '/login', '/signup', '/do/login', '/do/signup', '/static/style.css']:
# page requires authentication
if 'user_id' not in session:
return redirect(url_for('home'))
@app.route("/")
def home():
if 'user_id' in session:
@ -60,26 +67,62 @@ def about():
@app.route("/login")
def show_login_page():
error = None
if 'error' in session:
error = session.pop('error')
return render_template("login.html", signup = False, error = error)
return render_template("login.html", signup = False, error = session.pop('error', None))
@app.route("/signup")
def show_signup_page():
error = None
if 'error' in session:
error = session.pop('error')
return render_template("login.html", signup = True, error = error)
return render_template("login.html", signup = True, error = session.pop('error', None))
@app.route("/settings")
@app.route("/settings", methods=['GET', 'POST'])
def settings():
return render_template("coming_soon.html")
if request.method == 'GET':
dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
dc.execute("SELECT * FROM `users` WHERE id = %s", (session['user_id'],))
user = dc.fetchone()
dc.close()
return render_template("settings.html", user = user)
return render_template("settings.html", user = user, error = session.pop('error', None), success = session.pop('success', None))
else:
# update settings
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM users WHERE email = %s AND id != %s", (request.form['email'], session['user_id']))
if c.fetchone()[0] > 0:
session['error'] = "Email address already in use."
return redirect(url_for("settings"), 303)
for setting in [request.form['fetch-error'], request.form['submit-error'], request.form['reply-error'], request.form['generation-error']]:
if setting not in ['once', 'always', 'never']:
session['error'] = 'Invalid option "{}".'.format(setting)
return redirect(url_for('settings'), 303)
if request.form['password'] != '':
# user is updating their password
if len(request.form['password']) < 8:
session['error'] = "Password too short."
return redirect(url_for("settings"), 303)
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest()
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12))
c.execute("UPDATE users SET password = %s WHERE id = %s", (pw, session['user_id']))
try:
c.execute("UPDATE users SET email = %s, `fetch` = %s, submit = %s, generation = %s, reply = %s WHERE id = %s", (
request.form['email'],
request.form['fetch-error'],
request.form['submit-error'],
request.form['generation-error'],
request.form['reply-error'],
session['user_id']
))
c.close()
mysql.connection.commit()
except:
session['error'] = "Encountered an error while updating the database."
return redirect(url_for('settings'), 303)
session['success'] = True
return redirect(url_for('settings'), 303)
@app.route("/bot/edit/<id>")
def bot_edit(id):
@ -147,7 +190,6 @@ def bot_accounts(id):
@app.route("/bot/accounts/add", methods = ['GET', 'POST'])
def bot_accounts_add():
error = None
if request.method == 'POST':
if session['step'] == 1:
if request.form['account'] == session['bot']:
@ -193,7 +235,7 @@ def bot_accounts_add():
error = "Couldn't access ActivityPub outbox. {} may require authenticated fetches, which FediBooks doesn't support yet."
return render_template("bot_accounts_add.html", error = error)
return render_template("bot_accounts_add.html", error = error)
return render_template("bot_accounts_add.html", error = session.pop('error', None))
@app.route("/bot/accounts/toggle/<id>")
def bot_accounts_toggle(id):
@ -224,7 +266,6 @@ def bot_accounts_delete(id):
@app.route("/bot/create/", methods=['GET', 'POST'])
def bot_create():
error = None
if request.method == 'POST':
if session['step'] == 1:
# strip leading https://, if provided
@ -330,9 +371,7 @@ def bot_create():
del session['client_id']
del session['client_secret']
if 'error' in session:
error = session.pop('error')
return render_template("bot_create.html", error = error)
return render_template("bot_create.html", error = session.pop('error', None))
@app.route("/bot/create/back")
def bot_create_back():
@ -357,11 +396,16 @@ def do_signup():
session['error'] = "Password too short."
return redirect(url_for("show_signup_page"), 303)
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM users WHERE email = %s", (request.form['email'],))
if c.fetchone()[0] > 0:
session['error'] = "Email address already in use."
return redirect(url_for("show_signup_page"), 303)
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest()
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12))
# try to sign up
c = mysql.connection.cursor()
c.execute("INSERT INTO `users` (email, password) VALUES (%s, %s)", (request.form['email'], pw))
user_id = c.lastrowid
mysql.connection.commit()
@ -408,10 +452,3 @@ def bot_check(bot):
c = mysql.connection.cursor()
c.execute("SELECT COUNT(*) FROM `bots` WHERE `handle` = %s AND `user_id` = %s", (bot, session['user_id']))
return c.fetchone()[0] == 1
@app.before_request
def login_check():
if request.path not in ['/', '/about', '/welcome', '/login', '/signup', '/do/login', '/do/signup', '/static/style.css']:
# page requires authentication
if 'user_id' not in session:
return redirect(url_for('home'))