2019-09-06 02:38:50 +00:00
from bs4 import BeautifulSoup
2019-09-09 03:39:28 +00:00
import MySQLdb
import markovify
2019-09-11 08:02:26 +00:00
from mastodon import Mastodon , MastodonUnauthorizedError
2019-09-09 03:39:28 +00:00
import html , re , json
cfg = json . load ( open ( ' config.json ' ) )
class nlt_fixed ( markovify . NewlineText ) : # modified version of NewlineText that never rejects sentences
def test_sentence_input ( self , sentence ) :
return True # all sentences are valid <3
2019-09-06 02:38:50 +00:00
def extract_post ( post ) :
post = html . unescape ( post ) # convert HTML escape codes to text
soup = BeautifulSoup ( post , " html.parser " )
for lb in soup . select ( " br " ) : # replace <br> with linebreak
lb . insert_after ( " \n " )
lb . decompose ( )
for p in soup . select ( " p " ) : # ditto for <p>
p . insert_after ( " \n " )
p . unwrap ( )
for ht in soup . select ( " a.hashtag " ) : # convert hashtags from links to text
ht . unwrap ( )
for link in soup . select ( " a " ) : #ocnvert <a href='https://example.com>example.com</a> to just https://example.com
link . insert_after ( link [ " href " ] )
link . decompose ( )
text = soup . get_text ( )
text = re . sub ( " https://([^/]+)/(@[^ ]+) " , r " \ 2@ \ 1 " , text ) # put mastodon-style mentions back in
text = re . sub ( " https://([^/]+)/users/([^ ]+) " , r " @ \ 2@ \ 1 " , text ) # put pleroma-style mentions back in
text = text . rstrip ( " \n " ) # remove trailing newline(s)
return text
2019-09-09 03:39:28 +00:00
2019-09-11 04:37:13 +00:00
def make_post ( args ) :
id = None
2019-09-11 05:00:15 +00:00
acct = None
if len ( args ) > 1 :
2019-09-11 04:37:13 +00:00
id = args [ 1 ]
2019-09-11 05:00:15 +00:00
acct = args [ 3 ]
2019-09-11 04:37:13 +00:00
handle = args [ 0 ]
2019-09-09 03:39:28 +00:00
db = MySQLdb . connect (
host = cfg [ ' db_host ' ] ,
user = cfg [ ' db_user ' ] ,
passwd = cfg [ ' db_pass ' ] ,
db = cfg [ ' db_name ' ]
)
print ( " Generating post for {} " . format ( handle ) )
2019-09-09 04:08:25 +00:00
dc = db . cursor ( MySQLdb . cursors . DictCursor )
2019-09-09 03:39:28 +00:00
c = db . cursor ( )
2019-09-09 04:08:25 +00:00
dc . execute ( """
2019-09-09 03:39:28 +00:00
SELECT
2019-09-11 05:00:15 +00:00
learn_from_cw ,
length ,
2019-09-09 04:08:25 +00:00
fake_mentions ,
fake_mentions_full ,
post_privacy ,
content_warning ,
client_id ,
client_secret ,
secret
2019-09-09 03:39:28 +00:00
FROM
2019-09-09 04:08:25 +00:00
bots , credentials
2019-09-09 03:39:28 +00:00
WHERE
2019-09-09 04:08:25 +00:00
bots . credentials_id = ( SELECT
credentials_id
FROM
bots
WHERE
handle = % s )
2019-09-09 03:39:28 +00:00
""" , (handle,))
2019-09-09 04:08:25 +00:00
bot = dc . fetchone ( )
2019-09-09 03:39:28 +00:00
client = Mastodon (
2019-09-09 04:08:25 +00:00
client_id = bot [ ' client_id ' ] ,
client_secret = bot [ ' client_secret ' ] ,
access_token = bot [ ' secret ' ] ,
2019-09-09 03:39:28 +00:00
api_base_url = " https:// {} " . format ( handle . split ( " @ " ) [ 2 ] )
)
# by default, only select posts that don't have CWs.
# if learn_from_cw, then also select posts with CWs
cw_list = [ False ]
2019-09-09 04:08:25 +00:00
if bot [ ' learn_from_cw ' ] :
2019-09-09 03:39:28 +00:00
cw_list = [ False , True ]
# select 1000 random posts for the bot to learn from
c . execute ( " SELECT content FROM posts WHERE fedi_id IN (SELECT fedi_id FROM bot_learned_accounts WHERE bot_id = %s ) AND cw IN %s ORDER BY RAND() LIMIT 1000 " , ( handle , cw_list ) )
# this line is a little gross/optimised but here's what it does
# 1. fetch all of the results from the above query
# 2. turn (('this',), ('format')) into ('this', 'format')
# 3. convert the tuple to a list
# 4. join the list into a string separated by newlines
posts = " \n " . join ( list ( sum ( c . fetchall ( ) , ( ) ) ) )
2019-09-11 05:43:18 +00:00
if len ( posts ) == 0 :
print ( " No posts to learn from. " )
return
2019-09-09 03:39:28 +00:00
model = nlt_fixed ( posts )
tries = 0
2019-09-09 09:49:48 +00:00
post = None
2019-09-09 03:39:28 +00:00
2019-09-09 09:49:48 +00:00
if bot [ ' fake_mentions ' ] == ' never ' :
# remove all mentions from the training data before the markov model sees it
posts = re . sub ( r " @( \ w+)@([ \ w.]+) \ s? " , " " , posts )
2019-09-11 05:43:18 +00:00
# even with such a high tries value for markovify, it still sometimes returns none.
# so we implement our own tries function as well, and try ten times.
2019-09-09 09:49:48 +00:00
while post is None and tries < 10 :
2019-09-11 05:43:18 +00:00
post = model . make_short_sentence ( bot [ ' length ' ] , tries = 1000 )
2019-09-09 09:49:48 +00:00
tries + = 1
2019-09-09 03:39:28 +00:00
2019-09-09 09:49:48 +00:00
if post == None :
2019-09-09 03:39:28 +00:00
# TODO: send an error email
pass
else :
2019-09-09 09:49:48 +00:00
if " @ " in post and bot [ ' fake_mentions ' ] != ' never ' :
2019-09-09 09:53:08 +00:00
# the unicode zero width space is a (usually) invisible character
# we can insert it between the @ symbols in a handle to make it appear fine while not mentioning the user
2019-09-09 09:49:48 +00:00
zws = " \u200B "
if bot [ ' fake_mentions ' ] == ' middle ' :
# remove mentions at the start of a post
post = re . sub ( r " ^(@ \ w+@[ \ w.]+ \ s*)+ " , " " , post )
# TODO: does this regex catch all valid handles?
if bot [ ' fake_mentions_full ' ] :
post = re . sub ( r " @( \ w+)@([ \ w.]+) " , r " @ {} \ 1@ {} \ 2 " . format ( zws , zws ) , post )
else :
post = re . sub ( r " @( \ w+)@([ \ w.]+) " , r " @ {} \ 1 " . format ( zws ) , post )
print ( post )
2019-09-11 05:00:15 +00:00
visibility = bot [ ' post_privacy ' ] if len ( args ) == 1 else args [ 2 ]
if acct is not None :
post = " {} {} " . format ( acct , post )
# ensure post isn't longer than bot['length']
post = post [ : bot [ ' length ' ] ]
# send toot!!
2019-09-11 08:02:26 +00:00
try :
client . status_post ( post , id , visibility = visibility , spoiler_text = bot [ ' content_warning ' ] )
except MastodonUnauthorizedError :
# user has revoked the token given to the bot
# this needs to be dealt with properly later on, but for now, we'll just disable the bot
c . execute ( " UPDATE bots SET enabled = FALSE WHERE handle = %s " , ( handle , ) )
2019-09-09 03:39:28 +00:00
2019-09-11 04:37:13 +00:00
if id == None :
# this wasn't a reply, it was a regular post, so update the last post date
c . execute ( " UPDATE bots SET last_post = CURRENT_TIMESTAMP() WHERE handle = %s " , ( handle , ) )
db . commit ( )