#!/usr/bin/python3 # -*- coding: utf-8 -*- from config import max_age import hashlib import sqlite3 import time import tweepy _db = None def open_db(): """Open the database.""" global _db _db = sqlite3.connect('HTT.db', detect_types=sqlite3.PARSE_DECLTYPES) class LikeMemory(): """Track all liked Tweets.""" def __init__(self, api): """Build the Python object.""" global _db self._api = api sqlite3.register_adapter(bool, int) sqlite3.register_converter("BOOLEAN", lambda v: bool(int(v))) sql = ''' CREATE TABLE IF NOT EXISTS LikedTweets( sid INTEGER PRIMARY KEY, username TEXT, fingerprint TEXT, timestamp INTEGER, purged BOOLEAN ) ''' cursor = _db.cursor() cursor.execute(sql) _db.commit() def _compute_content_fingerprint(self, content): """Compute the fingerprint of a given status content.""" # Step 1: filter all hashtags cut = content.split(' ') content = ' '.join([ c for c in cut if not c.startswith('#') ]) # Step 2: stop at the first link cut = content.split(' ') keep = [] for c in cut: if c.startswith('http://') or c.startswith('https://'): break keep.append(c) content = ' '.join(keep) # Step 3: get a fresh start separators = ':-!?.' content = content.lstrip(' ' + separators) # Step 4: Extract a common shared base base = content for sep in separators: pos = base.find(sep) if pos != -1: base = base[:pos] return hashlib.md5(base.rstrip(' ').lower().encode('utf-8')).hexdigest() def is_original_content(self, content): """Ensure that a given content has never been seen.""" global _db fingerprint = self._compute_content_fingerprint(content) values = (fingerprint, ) cursor = _db.cursor() cursor.execute('SELECT sid FROM LikedTweets WHERE fingerprint = ?', values) found = cursor.fetchone() return found is None def save_liked_status(self, sid, username, content): """Remember a given liked status.""" global _db fingerprint = self._compute_content_fingerprint(content) timestamp = int(time.time()) values = (sid, username, fingerprint, timestamp, False) cursor = _db.cursor() cursor.execute('INSERT INTO LikedTweets VALUES (?, ?, ?, ?, ?)', values) _db.commit() def purge_old_status(self): """Purge old seen statuses.""" global _db timestamp = int(time.time()) - max_age * 24 * 60 * 60 values = (timestamp, False) cursor = _db.cursor() cursor.execute('SELECT sid FROM LikedTweets WHERE timestamp < ? AND purged = ?', values) rows = cursor.fetchall() for row in rows: sid = row[0] try: self._api.destroy_favorite(sid) # tweepy.error.TweepError: [{'code': 144, 'message': 'No status found with that ID.'}] except tweepy.error.TweepError as err: pass values = (True, sid) cursor = _db.cursor() cursor.execute('UPDATE LikedTweets SET purged = ? WHERE sid = ?', values) _db.commit() print('Purged %d liked Tweet%s!' % (len(rows), '' if len(rows) <= 1 else 's')) class TrackMemory(): """Remember last seen Tweet for users.""" def __init__(self): """Build the Python object.""" global _db sql = ''' CREATE TABLE IF NOT EXISTS TrackedUsers( uid INTEGER PRIMARY KEY, username TEXT, last INTEGER ) ''' cursor = _db.cursor() cursor.execute(sql) _db.commit() def get_last_seen_for(self, uid): """Get the status id of the last Tweet for a given user.""" global _db values = (uid, ) cursor = _db.cursor() cursor.execute('SELECT last FROM TrackedUsers WHERE uid = ?', values) found = cursor.fetchone() if found is None: last = None else: last = found[0] return last def set_last_seen_for(self, uid, name, sid): """Set the status id of the last seen Tweet for a given user.""" global _db since = self.get_last_seen_for(uid) if since is None: values = (uid, name, sid) cursor = _db.cursor() cursor.execute('INSERT INTO TrackedUsers VALUES (?, ?, ?)', values) else: values = (sid, uid) cursor = _db.cursor() cursor.execute('UPDATE TrackedUsers SET last = ? WHERE uid = ?', values) _db.commit()