diff --git a/app/functions.py b/app/functions.py index cd1ef73..42da7a7 100644 --- a/app/functions.py +++ b/app/functions.py @@ -1,5 +1,7 @@ from bs4 import BeautifulSoup import MySQLdb +from pebble import ProcessPool +from concurrent.futures import TimeoutError import markovify import requests from Crypto.PublicKey import RSA @@ -189,6 +191,23 @@ def make_post(args): db.commit() c.close() +def do_in_pool(function, data, timeout=30, silent=False): + with ProcessPool(max_workers=cfg['service_threads']) as p: + index = 0 + future = p.map(function, data) + iterator = future.result() + + while True: + try: + result = next(iterator) + except StopIteration: + # all threads are done + break + except TimeoutError as error: + if not silent: print("Timed out on {}.".format(data[index])) + finally: + index += 1 + def get_key(): db = MySQLdb.connect( host = cfg['db_host'], diff --git a/app/scrape.py b/app/scrape.py index e9af680..ce21a20 100755 --- a/app/scrape.py +++ b/app/scrape.py @@ -2,7 +2,6 @@ import MySQLdb import requests -from multiprocessing import Pool import json, re import functions @@ -131,7 +130,7 @@ cursor.execute("SELECT `handle`, `outbox` FROM `fedi_accounts` ORDER BY RAND()") accounts = cursor.fetchall() cursor.close() db.close() -with Pool(cfg['service_threads']) as p: - p.map(scrape_posts, accounts) + +functions.do_in_pool(scrape_posts, accounts, timeout=60) print("Done!") diff --git a/app/service.py b/app/service.py index 2fd3a48..a7f4185 100755 --- a/app/service.py +++ b/app/service.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 +import json + import MySQLdb from mastodon import Mastodon -from multiprocessing import Pool import requests -import json + import functions cfg = json.load(open('config.json')) @@ -73,8 +74,7 @@ cursor.execute("SELECT handle FROM bots WHERE enabled = TRUE AND TIMESTAMPDIFF(M # cursor.execute("SELECT handle FROM bots WHERE enabled = TRUE") bots = cursor.fetchall() -with Pool(cfg['service_threads']) as p: - p.map(functions.make_post, bots) +functions.do_in_pool(functions.make_post, bots, 15) print("Updating cached icons") dc = db.cursor(MySQLdb.cursors.DictCursor) @@ -86,7 +86,6 @@ ON bots.credentials_id = credentials.id WHERE TIMESTAMPDIFF(HOUR, icon_update_time, CURRENT_TIMESTAMP()) > 2""") bots = dc.fetchall() -with Pool(cfg['service_threads']) as p: - p.map(update_icon, bots) +functions.do_in_pool(update_icon, bots) db.commit() diff --git a/requirements.txt b/requirements.txt index 2fbb306..7b7093d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,5 +7,6 @@ flask-mysqldb==0.2.0 bcrypt == 3.1.7 requests==2.23.0 http-ece==1.1.0 -pycryptodome==3.9.7 +pycryptodome==3.9.7 cryptography==2.9.2 +pebble==4.5.3