2019-09-06 02:38:50 +00:00
from bs4 import BeautifulSoup
2019-09-09 03:39:28 +00:00
import MySQLdb
import markovify
2019-09-11 08:02:26 +00:00
from mastodon import Mastodon , MastodonUnauthorizedError
2019-09-09 03:39:28 +00:00
import html , re , json
cfg = json . load ( open ( ' config.json ' ) )
class nlt_fixed ( markovify . NewlineText ) : # modified version of NewlineText that never rejects sentences
def test_sentence_input ( self , sentence ) :
return True # all sentences are valid <3
2019-09-06 02:38:50 +00:00
def extract_post ( post ) :
post = html . unescape ( post ) # convert HTML escape codes to text
soup = BeautifulSoup ( post , " html.parser " )
for lb in soup . select ( " br " ) : # replace <br> with linebreak
lb . insert_after ( " \n " )
lb . decompose ( )
for p in soup . select ( " p " ) : # ditto for <p>
p . insert_after ( " \n " )
p . unwrap ( )
for ht in soup . select ( " a.hashtag " ) : # convert hashtags from links to text
ht . unwrap ( )
for link in soup . select ( " a " ) : #ocnvert <a href='https://example.com>example.com</a> to just https://example.com
link . insert_after ( link [ " href " ] )
link . decompose ( )
text = soup . get_text ( )
2019-09-21 05:10:40 +00:00
text = re . sub ( r " https://([^/]+)/(@[^ \ s]+) " , r " \ 2@ \ 1 " , text ) # put mastodon-style mentions back in
text = re . sub ( r " https://([^/]+)/users/([^ \ s/]+) " , r " @ \ 2@ \ 1 " , text ) # put pleroma-style mentions back in
2019-09-06 02:38:50 +00:00
text = text . rstrip ( " \n " ) # remove trailing newline(s)
return text
2019-09-09 03:39:28 +00:00
2020-01-20 02:53:11 +00:00
def generate_output ( handle ) :
2019-09-09 03:39:28 +00:00
db = MySQLdb . connect (
host = cfg [ ' db_host ' ] ,
user = cfg [ ' db_user ' ] ,
passwd = cfg [ ' db_pass ' ] ,
db = cfg [ ' db_name ' ]
)
2020-01-20 02:18:17 +00:00
# print("Generating post for {}".format(handle))
2019-09-09 04:08:25 +00:00
dc = db . cursor ( MySQLdb . cursors . DictCursor )
2019-09-09 03:39:28 +00:00
c = db . cursor ( )
2019-09-09 04:08:25 +00:00
dc . execute ( """
2020-01-20 02:18:17 +00:00
SELECT
2019-09-11 05:00:15 +00:00
learn_from_cw ,
length ,
2019-09-09 04:08:25 +00:00
fake_mentions ,
fake_mentions_full ,
post_privacy ,
content_warning ,
client_id ,
client_secret ,
secret
2019-09-09 03:39:28 +00:00
FROM
2019-09-09 04:08:25 +00:00
bots , credentials
2019-09-09 03:39:28 +00:00
WHERE
2020-01-20 02:18:17 +00:00
bots . handle = % s
2019-09-14 01:49:34 +00:00
AND bots . credentials_id = credentials . id
2019-09-09 03:39:28 +00:00
""" , (handle,))
2019-09-09 04:08:25 +00:00
bot = dc . fetchone ( )
2019-09-09 03:39:28 +00:00
# by default, only select posts that don't have CWs.
# if learn_from_cw, then also select posts with CWs
cw_list = [ False ]
2019-09-09 04:08:25 +00:00
if bot [ ' learn_from_cw ' ] :
2019-09-09 03:39:28 +00:00
cw_list = [ False , True ]
# select 1000 random posts for the bot to learn from
c . execute ( " SELECT content FROM posts WHERE fedi_id IN (SELECT fedi_id FROM bot_learned_accounts WHERE bot_id = %s ) AND cw IN %s ORDER BY RAND() LIMIT 1000 " , ( handle , cw_list ) )
# this line is a little gross/optimised but here's what it does
# 1. fetch all of the results from the above query
# 2. turn (('this',), ('format')) into ('this', 'format')
# 3. convert the tuple to a list
# 4. join the list into a string separated by newlines
posts = " \n " . join ( list ( sum ( c . fetchall ( ) , ( ) ) ) )
2019-09-11 05:43:18 +00:00
if len ( posts ) == 0 :
2020-01-20 02:18:17 +00:00
print ( " {} - No posts to learn from. " . format ( handle ) )
2019-09-11 05:43:18 +00:00
return
2019-09-09 03:39:28 +00:00
2019-09-09 09:49:48 +00:00
if bot [ ' fake_mentions ' ] == ' never ' :
# remove all mentions from the training data before the markov model sees it
2019-09-14 04:25:54 +00:00
posts = re . sub ( r " (?<! \ S)@ \ w+(@[ \ w.]+)? \ s? " , " " , posts )
2019-09-09 09:49:48 +00:00
2019-09-19 06:17:26 +00:00
model = nlt_fixed ( posts )
tries = 0
post = None
2019-09-11 05:43:18 +00:00
# even with such a high tries value for markovify, it still sometimes returns none.
# so we implement our own tries function as well, and try ten times.
2019-09-09 09:49:48 +00:00
while post is None and tries < 10 :
2019-09-11 05:43:18 +00:00
post = model . make_short_sentence ( bot [ ' length ' ] , tries = 1000 )
2019-09-09 09:49:48 +00:00
tries + = 1
2019-09-09 03:39:28 +00:00
2019-09-09 09:49:48 +00:00
if post == None :
2019-09-09 03:39:28 +00:00
# TODO: send an error email
pass
else :
2019-09-09 09:49:48 +00:00
if " @ " in post and bot [ ' fake_mentions ' ] != ' never ' :
2019-09-09 09:53:08 +00:00
# the unicode zero width space is a (usually) invisible character
# we can insert it between the @ symbols in a handle to make it appear fine while not mentioning the user
2019-09-09 09:49:48 +00:00
zws = " \u200B "
if bot [ ' fake_mentions ' ] == ' middle ' :
# remove mentions at the start of a post
2019-09-14 04:25:54 +00:00
post = re . sub ( r " ^(@ \ w+(@[ \ w.]+)? \ s*)+ " , " " , post )
2019-09-09 09:49:48 +00:00
# TODO: does this regex catch all valid handles?
if bot [ ' fake_mentions_full ' ] :
post = re . sub ( r " @( \ w+)@([ \ w.]+) " , r " @ {} \ 1@ {} \ 2 " . format ( zws , zws ) , post )
else :
2020-01-20 02:18:17 +00:00
post = re . sub ( r " @( \ w+)@([ \ w.]+) " , r " @ {} \ 1 " . format ( zws ) , post )
2019-09-14 04:25:54 +00:00
# also format handles without instances, e.g. @user instead of @user@instan.ce
post = re . sub ( r " (?<! \ S)@( \ w+) " , r " @ {} \ 1 " . format ( zws ) , post )
2019-09-09 09:49:48 +00:00
2020-01-20 02:56:24 +00:00
return bot , post
2020-01-20 02:53:11 +00:00
def make_post ( args ) :
id = None
acct = None
if len ( args ) > 1 :
id = args [ 1 ]
acct = args [ 3 ]
handle = args [ 0 ]
2020-01-20 04:40:59 +00:00
# print("Generating post for {}".format(handle))
2020-01-20 04:38:55 +00:00
bot , post = generate_output ( handle )
2020-01-20 03:01:56 +00:00
client = Mastodon (
client_id = bot [ ' client_id ' ] ,
client_secret = bot [ ' client_secret ' ] ,
access_token = bot [ ' secret ' ] ,
api_base_url = " https:// {} " . format ( handle . split ( " @ " ) [ 2 ] )
)
2020-01-20 04:40:59 +00:00
db = MySQLdb . connect (
host = cfg [ ' db_host ' ] ,
user = cfg [ ' db_user ' ] ,
passwd = cfg [ ' db_pass ' ] ,
db = cfg [ ' db_name ' ]
)
c = db . cursor ( )
2020-01-20 02:53:11 +00:00
# print(post)
visibility = bot [ ' post_privacy ' ] if len ( args ) == 1 else args [ 2 ]
visibilities = [ ' public ' , ' unlisted ' , ' private ' ]
if visibilities . index ( visibility ) < visibilities . index ( bot [ ' post_privacy ' ] ) :
# if post_privacy is set to a more restricted level than the visibility of the post we're replying to, use the user's setting
visibility = bot [ ' post_privacy ' ]
if acct is not None :
post = " {} {} " . format ( acct , post )
# ensure post isn't longer than bot['length']
# TODO: ehhhhhhhhh
post = post [ : bot [ ' length ' ] ]
# send toot!!
try :
client . status_post ( post , id , visibility = visibility , spoiler_text = bot [ ' content_warning ' ] )
except MastodonUnauthorizedError :
# user has revoked the token given to the bot
# this needs to be dealt with properly later on, but for now, we'll just disable the bot
c . execute ( " UPDATE bots SET enabled = FALSE WHERE handle = %s " , ( handle , ) )
2020-01-20 05:21:46 +00:00
except :
print ( " Failed to create post for {} " . format ( handle ) )
2019-09-09 03:39:28 +00:00
2019-09-11 04:37:13 +00:00
if id == None :
# this wasn't a reply, it was a regular post, so update the last post date
c . execute ( " UPDATE bots SET last_post = CURRENT_TIMESTAMP() WHERE handle = %s " , ( handle , ) )
db . commit ( )
2020-01-20 05:21:46 +00:00
c . close ( )