mirror of
https://github.com/Lynnesbian/FediBooks/
synced 2024-11-25 16:48:58 +00:00
Compare commits
10 commits
436a911397
...
49ddde2b9f
Author | SHA1 | Date | |
---|---|---|---|
49ddde2b9f | |||
9d6615b579 | |||
eb3fb77c18 | |||
3a0c67fc4e | |||
9c1a4378b3 | |||
33cbbd1de9 | |||
7f638e321e | |||
1d3b06cb0b | |||
3ebcb589ce | |||
7b5227c3dc |
9 changed files with 197 additions and 104 deletions
103
functions.py
103
functions.py
|
@ -1,5 +1,14 @@
|
|||
from bs4 import BeautifulSoup
|
||||
import html, re
|
||||
import MySQLdb
|
||||
import markovify
|
||||
from mastodon import Mastodon
|
||||
import html, re, json
|
||||
|
||||
cfg = json.load(open('config.json'))
|
||||
|
||||
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences
|
||||
def test_sentence_input(self, sentence):
|
||||
return True # all sentences are valid <3
|
||||
|
||||
def extract_post(post):
|
||||
post = html.unescape(post) # convert HTML escape codes to text
|
||||
|
@ -24,3 +33,95 @@ def extract_post(post):
|
|||
text = re.sub("https://([^/]+)/users/([^ ]+)", r"@\2@\1", text) # put pleroma-style mentions back in
|
||||
text = text.rstrip("\n") # remove trailing newline(s)
|
||||
return text
|
||||
|
||||
def make_post(handle):
|
||||
handle = handle[0]
|
||||
db = MySQLdb.connect(
|
||||
host = cfg['db_host'],
|
||||
user=cfg['db_user'],
|
||||
passwd=cfg['db_pass'],
|
||||
db=cfg['db_name']
|
||||
)
|
||||
print("Generating post for {}".format(handle))
|
||||
dc = db.cursor(MySQLdb.cursors.DictCursor)
|
||||
c = db.cursor()
|
||||
dc.execute("""
|
||||
SELECT
|
||||
learn_from_cw,
|
||||
fake_mentions,
|
||||
fake_mentions_full,
|
||||
post_privacy,
|
||||
content_warning,
|
||||
client_id,
|
||||
client_secret,
|
||||
secret
|
||||
FROM
|
||||
bots, credentials
|
||||
WHERE
|
||||
bots.credentials_id = (SELECT
|
||||
credentials_id
|
||||
FROM
|
||||
bots
|
||||
WHERE
|
||||
handle = %s)
|
||||
""", (handle,))
|
||||
|
||||
bot = dc.fetchone()
|
||||
client = Mastodon(
|
||||
client_id = bot['client_id'],
|
||||
client_secret = bot['client_secret'],
|
||||
access_token = bot['secret'],
|
||||
api_base_url = "https://{}".format(handle.split("@")[2])
|
||||
)
|
||||
|
||||
# by default, only select posts that don't have CWs.
|
||||
# if learn_from_cw, then also select posts with CWs
|
||||
cw_list = [False]
|
||||
if bot['learn_from_cw']:
|
||||
cw_list = [False, True]
|
||||
|
||||
# select 1000 random posts for the bot to learn from
|
||||
c.execute("SELECT content FROM posts WHERE fedi_id IN (SELECT fedi_id FROM bot_learned_accounts WHERE bot_id = %s) AND cw IN %s ORDER BY RAND() LIMIT 1000", (handle, cw_list))
|
||||
|
||||
# this line is a little gross/optimised but here's what it does
|
||||
# 1. fetch all of the results from the above query
|
||||
# 2. turn (('this',), ('format')) into ('this', 'format')
|
||||
# 3. convert the tuple to a list
|
||||
# 4. join the list into a string separated by newlines
|
||||
posts = "\n".join(list(sum(c.fetchall(), ())))
|
||||
|
||||
model = nlt_fixed(posts)
|
||||
tries = 0
|
||||
post = None
|
||||
# even with such a high tries value for markovify, it still sometimes returns none.
|
||||
# so we implement our own tries function as well, and try ten times.
|
||||
|
||||
if bot['fake_mentions'] == 'never':
|
||||
# remove all mentions from the training data before the markov model sees it
|
||||
posts = re.sub(r"@(\w+)@([\w.]+)\s?", "", posts)
|
||||
|
||||
while post is None and tries < 10:
|
||||
post = model.make_short_sentence(500, tries = 10000)
|
||||
tries += 1
|
||||
|
||||
if post == None:
|
||||
# TODO: send an error email
|
||||
pass
|
||||
else:
|
||||
if "@" in post and bot['fake_mentions'] != 'never':
|
||||
# the unicode zero width space is a (usually) invisible character
|
||||
# we can insert it between the @ symbols in a handle to make it appear fine while not mentioning the user
|
||||
zws = "\u200B"
|
||||
if bot['fake_mentions'] == 'middle':
|
||||
# remove mentions at the start of a post
|
||||
post = re.sub(r"^(@\w+@[\w.]+\s*)+", "", post)
|
||||
# TODO: does this regex catch all valid handles?
|
||||
if bot['fake_mentions_full']:
|
||||
post = re.sub(r"@(\w+)@([\w.]+)", r"@{}\1@{}\2".format(zws, zws), post)
|
||||
else:
|
||||
post = re.sub(r"@(\w+)@([\w.]+)", r"@{}\1".format(zws), post)
|
||||
|
||||
print(post)
|
||||
client.status_post(post, visibility = bot['post_privacy'], spoiler_text = bot['content_warning'])
|
||||
|
||||
# TODO: update date of last post
|
||||
|
|
67
service.py
67
service.py
|
@ -9,10 +9,6 @@ import functions
|
|||
|
||||
cfg = json.load(open('config.json'))
|
||||
|
||||
class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences
|
||||
def test_sentence_input(self, sentence):
|
||||
return True # all sentences are valid <3
|
||||
|
||||
def scrape_posts(account):
|
||||
handle = account[0]
|
||||
outbox = account[1]
|
||||
|
@ -88,67 +84,6 @@ def scrape_posts(account):
|
|||
db.commit()
|
||||
c.close()
|
||||
|
||||
def make_post(handle):
|
||||
handle = handle[0]
|
||||
print("Generating post for {}".format(handle))
|
||||
c = db.cursor()
|
||||
c.execute("""
|
||||
SELECT
|
||||
learn_from_cw, client_id, client_secret, secret
|
||||
FROM
|
||||
bots, credentials
|
||||
WHERE
|
||||
bots.credentials_id = (SELECT
|
||||
credentials_id
|
||||
FROM
|
||||
bots
|
||||
WHERE
|
||||
handle = %s)
|
||||
""", (handle,))
|
||||
|
||||
bot = c.fetchone()
|
||||
client = Mastodon(
|
||||
client_id = bot[1],
|
||||
client_secret = bot[2],
|
||||
access_token = bot[3],
|
||||
api_base_url = "https://{}".format(handle.split("@")[2])
|
||||
)
|
||||
|
||||
# by default, only select posts that don't have CWs.
|
||||
# if learn_from_cw, then also select posts with CWs
|
||||
cw_list = [False]
|
||||
if bot[0]:
|
||||
cw_list = [False, True]
|
||||
|
||||
# select 1000 random posts for the bot to learn from
|
||||
c.execute("SELECT content FROM posts WHERE fedi_id IN (SELECT fedi_id FROM bot_learned_accounts WHERE bot_id = %s) AND cw IN %s ORDER BY RAND() LIMIT 1000", (handle, cw_list))
|
||||
|
||||
# this line is a little gross/optimised but here's what it does
|
||||
# 1. fetch all of the results from the above query
|
||||
# 2. turn (('this',), ('format')) into ('this', 'format')
|
||||
# 3. convert the tuple to a list
|
||||
# 4. join the list into a string separated by newlines
|
||||
posts = "\n".join(list(sum(c.fetchall(), ())))
|
||||
|
||||
model = nlt_fixed(posts)
|
||||
tries = 0
|
||||
sentence = None
|
||||
# even with such a high tries value for markovify, it still sometimes returns none.
|
||||
# so we implement our own tries function as well, and try ten times.
|
||||
while sentence is None and tries < 10:
|
||||
sentence = model.make_short_sentence(500, tries = 10000)
|
||||
tries += 1
|
||||
|
||||
# TODO: mention handling
|
||||
|
||||
if sentence == None:
|
||||
# TODO: send an error email
|
||||
pass
|
||||
else:
|
||||
client.status_post(sentence)
|
||||
|
||||
# TODO: update date of last post
|
||||
|
||||
print("Establishing DB connection")
|
||||
db = MySQLdb.connect(
|
||||
host = cfg['db_host'],
|
||||
|
@ -173,6 +108,6 @@ cursor.execute("SELECT handle FROM bots WHERE enabled = TRUE")
|
|||
bots = cursor.fetchall()
|
||||
|
||||
with Pool(8) as p:
|
||||
p.map(make_post, bots)
|
||||
p.map(functions.make_post, bots)
|
||||
|
||||
#TODO: other cron tasks should be done here, like updating profile pictures
|
||||
|
|
|
@ -26,9 +26,9 @@ CREATE TABLE IF NOT EXISTS `bots` (
|
|||
`post_frequency` SMALLINT UNSIGNED DEFAULT 30,
|
||||
`content_warning` VARCHAR(128),
|
||||
`length` SMALLINT UNSIGNED DEFAULT 500,
|
||||
`fake_mentions` ENUM('always', 'start', 'never') DEFAULT 'start',
|
||||
`fake_mentions` ENUM('always', 'middle', 'never') DEFAULT 'middle',
|
||||
`fake_mentions_full` BOOLEAN DEFAULT 0,
|
||||
`post_privacy` ENUM('public', 'unlisted', 'followers_only') DEFAULT 'unlisted',
|
||||
`post_privacy` ENUM('public', 'unlisted', 'private') DEFAULT 'unlisted',
|
||||
`learn_from_cw` BOOLEAN DEFAULT 0,
|
||||
`last_post` DATETIME DEFAULT 0,
|
||||
`icon` VARCHAR(512),
|
||||
|
|
|
@ -183,10 +183,18 @@ form .row {
|
|||
display: inline-block;
|
||||
}
|
||||
|
||||
.error {
|
||||
background-color: #e66;
|
||||
.error, .success {
|
||||
color: white;
|
||||
text-align: center;
|
||||
font-size: 1.6em;
|
||||
padding: 10px;
|
||||
}
|
||||
.error {
|
||||
background-color: #e66;
|
||||
}
|
||||
.error.err-small {
|
||||
font-size: 1.0em;
|
||||
}
|
||||
.success {
|
||||
background-color: #6e6;
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
|
||||
{% elif session['step'] == 4 %}
|
||||
<h2 class="thin centred">Congratulations!</h2>
|
||||
<p>FediBooks has successfully authenticated with {{ session['instance'] }}, and your bot is ready to be configured. Click finish to return to the bot management screen.</p>
|
||||
<p>FediBooks has successfully authenticated with your instance, and your bot is ready to be configured. Click finish to return to the bot management screen.</p>
|
||||
<p>To get your bot working, you need to add at least one account for it to learn from. You can do so by clicking the <i class="fas fa-users"></i> button. To configure settings such as posting frequency and content warnings, click the <i class="fas fa-cog"></i> button.</p>
|
||||
|
||||
{% else %}
|
||||
|
|
|
@ -33,8 +33,8 @@
|
|||
<div class="row">
|
||||
<label for="fake-mentions" class="large">Fake mentions</label>
|
||||
<select name="fake-mentions">
|
||||
<option value="full">At any time</option>
|
||||
<option value="start" default>Only at the start of posts</option>
|
||||
<option value="always">At any time</option>
|
||||
<option value="middle" default>Only in the middle of posts</option>
|
||||
<option value="never">Never</option>
|
||||
</select>
|
||||
</div>
|
||||
|
|
|
@ -10,9 +10,12 @@
|
|||
<div class="container">
|
||||
<h1 class="thin centred">Account settings</h1>
|
||||
</div>
|
||||
|
||||
{% include 'error.html' %}
|
||||
{% include 'success.html' %}
|
||||
|
||||
<div class="container">
|
||||
<form action="/do/bot/edit" method="post" class="full-width">
|
||||
<form method="POST" class="full-width">
|
||||
<div class="container light">
|
||||
<h2 class="thin centred">Login settings</h2>
|
||||
<p class="centred">Update your email and password here.</p>
|
||||
|
@ -32,6 +35,10 @@
|
|||
<p class="centred">When should FediBooks send you email?</p>
|
||||
</div>
|
||||
|
||||
<div class="error err-small">
|
||||
Note: This feature isn't ready yet. As of now, FediBooks will not send you email.
|
||||
</div>
|
||||
|
||||
<div class="row">
|
||||
<label for="fetch-error" class="large">When my bot(s) can't get new posts</label>
|
||||
<select name="fetch-error">
|
||||
|
|
5
templates/success.html
Normal file
5
templates/success.html
Normal file
|
@ -0,0 +1,5 @@
|
|||
{% if success != None %}
|
||||
<div class="success">
|
||||
<i class="fas fa-check-circle"></i> Information updated succesfully.
|
||||
</div>
|
||||
{% endif %}
|
95
webui.py
95
webui.py
|
@ -20,6 +20,13 @@ mysql = MySQL(app)
|
|||
|
||||
scopes = ['write:statuses', 'write:accounts', 'read:accounts', 'read:notifications', 'read:statuses', 'push']
|
||||
|
||||
@app.before_request
|
||||
def login_check():
|
||||
if request.path not in ['/', '/about', '/welcome', '/login', '/signup', '/do/login', '/do/signup', '/static/style.css']:
|
||||
# page requires authentication
|
||||
if 'user_id' not in session:
|
||||
return redirect(url_for('home'))
|
||||
|
||||
@app.route("/")
|
||||
def home():
|
||||
if 'user_id' in session:
|
||||
|
@ -60,26 +67,62 @@ def about():
|
|||
|
||||
@app.route("/login")
|
||||
def show_login_page():
|
||||
error = None
|
||||
if 'error' in session:
|
||||
error = session.pop('error')
|
||||
return render_template("login.html", signup = False, error = error)
|
||||
return render_template("login.html", signup = False, error = session.pop('error', None))
|
||||
|
||||
@app.route("/signup")
|
||||
def show_signup_page():
|
||||
error = None
|
||||
if 'error' in session:
|
||||
error = session.pop('error')
|
||||
return render_template("login.html", signup = True, error = error)
|
||||
return render_template("login.html", signup = True, error = session.pop('error', None))
|
||||
|
||||
@app.route("/settings")
|
||||
@app.route("/settings", methods=['GET', 'POST'])
|
||||
def settings():
|
||||
return render_template("coming_soon.html")
|
||||
dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
|
||||
dc.execute("SELECT * FROM `users` WHERE id = %s", (session['user_id'],))
|
||||
user = dc.fetchone()
|
||||
dc.close()
|
||||
return render_template("settings.html", user = user)
|
||||
if request.method == 'GET':
|
||||
dc = mysql.connection.cursor(MySQLdb.cursors.DictCursor)
|
||||
dc.execute("SELECT * FROM `users` WHERE id = %s", (session['user_id'],))
|
||||
user = dc.fetchone()
|
||||
dc.close()
|
||||
return render_template("settings.html", user = user, error = session.pop('error', None), success = session.pop('success', None))
|
||||
|
||||
else:
|
||||
# update settings
|
||||
c = mysql.connection.cursor()
|
||||
|
||||
c.execute("SELECT COUNT(*) FROM users WHERE email = %s AND id != %s", (request.form['email'], session['user_id']))
|
||||
if c.fetchone()[0] > 0:
|
||||
session['error'] = "Email address already in use."
|
||||
return redirect(url_for("settings"), 303)
|
||||
|
||||
for setting in [request.form['fetch-error'], request.form['submit-error'], request.form['reply-error'], request.form['generation-error']]:
|
||||
if setting not in ['once', 'always', 'never']:
|
||||
session['error'] = 'Invalid option "{}".'.format(setting)
|
||||
return redirect(url_for('settings'), 303)
|
||||
|
||||
if request.form['password'] != '':
|
||||
# user is updating their password
|
||||
if len(request.form['password']) < 8:
|
||||
session['error'] = "Password too short."
|
||||
return redirect(url_for("settings"), 303)
|
||||
|
||||
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest()
|
||||
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12))
|
||||
c.execute("UPDATE users SET password = %s WHERE id = %s", (pw, session['user_id']))
|
||||
|
||||
try:
|
||||
c.execute("UPDATE users SET email = %s, `fetch` = %s, submit = %s, generation = %s, reply = %s WHERE id = %s", (
|
||||
request.form['email'],
|
||||
request.form['fetch-error'],
|
||||
request.form['submit-error'],
|
||||
request.form['generation-error'],
|
||||
request.form['reply-error'],
|
||||
session['user_id']
|
||||
))
|
||||
c.close()
|
||||
mysql.connection.commit()
|
||||
except:
|
||||
session['error'] = "Encountered an error while updating the database."
|
||||
return redirect(url_for('settings'), 303)
|
||||
|
||||
session['success'] = True
|
||||
return redirect(url_for('settings'), 303)
|
||||
|
||||
@app.route("/bot/edit/<id>")
|
||||
def bot_edit(id):
|
||||
|
@ -147,7 +190,6 @@ def bot_accounts(id):
|
|||
|
||||
@app.route("/bot/accounts/add", methods = ['GET', 'POST'])
|
||||
def bot_accounts_add():
|
||||
error = None
|
||||
if request.method == 'POST':
|
||||
if session['step'] == 1:
|
||||
if request.form['account'] == session['bot']:
|
||||
|
@ -193,7 +235,7 @@ def bot_accounts_add():
|
|||
error = "Couldn't access ActivityPub outbox. {} may require authenticated fetches, which FediBooks doesn't support yet."
|
||||
return render_template("bot_accounts_add.html", error = error)
|
||||
|
||||
return render_template("bot_accounts_add.html", error = error)
|
||||
return render_template("bot_accounts_add.html", error = session.pop('error', None))
|
||||
|
||||
@app.route("/bot/accounts/toggle/<id>")
|
||||
def bot_accounts_toggle(id):
|
||||
|
@ -224,7 +266,6 @@ def bot_accounts_delete(id):
|
|||
|
||||
@app.route("/bot/create/", methods=['GET', 'POST'])
|
||||
def bot_create():
|
||||
error = None
|
||||
if request.method == 'POST':
|
||||
if session['step'] == 1:
|
||||
# strip leading https://, if provided
|
||||
|
@ -330,9 +371,7 @@ def bot_create():
|
|||
del session['client_id']
|
||||
del session['client_secret']
|
||||
|
||||
if 'error' in session:
|
||||
error = session.pop('error')
|
||||
return render_template("bot_create.html", error = error)
|
||||
return render_template("bot_create.html", error = session.pop('error', None))
|
||||
|
||||
@app.route("/bot/create/back")
|
||||
def bot_create_back():
|
||||
|
@ -357,11 +396,16 @@ def do_signup():
|
|||
session['error'] = "Password too short."
|
||||
return redirect(url_for("show_signup_page"), 303)
|
||||
|
||||
c = mysql.connection.cursor()
|
||||
c.execute("SELECT COUNT(*) FROM users WHERE email = %s", (request.form['email'],))
|
||||
if c.fetchone()[0] > 0:
|
||||
session['error'] = "Email address already in use."
|
||||
return redirect(url_for("show_signup_page"), 303)
|
||||
|
||||
pw_hashed = hashlib.sha256(request.form['password'].encode('utf-8')).digest()
|
||||
pw = bcrypt.hashpw(pw_hashed, bcrypt.gensalt(12))
|
||||
|
||||
# try to sign up
|
||||
c = mysql.connection.cursor()
|
||||
c.execute("INSERT INTO `users` (email, password) VALUES (%s, %s)", (request.form['email'], pw))
|
||||
user_id = c.lastrowid
|
||||
mysql.connection.commit()
|
||||
|
@ -408,10 +452,3 @@ def bot_check(bot):
|
|||
c = mysql.connection.cursor()
|
||||
c.execute("SELECT COUNT(*) FROM `bots` WHERE `handle` = %s AND `user_id` = %s", (bot, session['user_id']))
|
||||
return c.fetchone()[0] == 1
|
||||
|
||||
@app.before_request
|
||||
def login_check():
|
||||
if request.path not in ['/', '/about', '/welcome', '/login', '/signup', '/do/login', '/do/signup', '/static/style.css']:
|
||||
# page requires authentication
|
||||
if 'user_id' not in session:
|
||||
return redirect(url_for('home'))
|
||||
|
|
Loading…
Reference in a new issue