#!/usr/bin/python3 # -*- coding: utf-8 -*- from config import max_age import hashlib import sqlite3 import time import tweepy class LikeMemory(): """Track all liked Tweets.""" def __init__(self, api): """Build the Python object.""" self._api = api self._db = sqlite3.connect('HTT.db', detect_types=sqlite3.PARSE_DECLTYPES) sqlite3.register_adapter(bool, int) sqlite3.register_converter("BOOLEAN", lambda v: bool(int(v))) sql = ''' CREATE TABLE IF NOT EXISTS LikedTweets( sid INTEGER PRIMARY KEY, username TEXT, fingerprint TEXT, timestamp INTEGER, purged BOOLEAN ) ''' cursor = self._db.cursor() cursor.execute(sql) self._db.commit() def _compute_content_fingerprint(self, content): """Compute the fingerprint of a given status content.""" # Step 1: filter all hashtags cut = content.split(' ') content = ' '.join([ c for c in cut if not c.startswith('#') ]) # Step 2: stop at the first link cut = content.split(' ') keep = [] for c in cut: if c.startswith('http://') or c.startswith('https://'): break keep.append(c) content = ' '.join(keep) # Step 3: get a fresh start separators = ':-!?.' content = content.lstrip(' ' + separators) # Step 4: Extract a common shared base base = content for sep in separators: pos = base.find(sep) if pos != -1: base = base[:pos] return hashlib.md5(base.rstrip(' ').encode('utf-8')).hexdigest() def is_original_content(self, content): """Ensure that a given content has never been seen.""" fingerprint = self._compute_content_fingerprint(content) values = (fingerprint, ) cursor = self._db.cursor() cursor.execute('SELECT sid FROM LikedTweets WHERE fingerprint = ?', values) found = cursor.fetchone() return found is None def save_liked_status(self, sid, username, content): """Remember a given liked status.""" fingerprint = self._compute_content_fingerprint(content) timestamp = int(time.time()) values = (sid, username, fingerprint, timestamp, False) cursor = self._db.cursor() cursor.execute('INSERT INTO LikedTweets VALUES (?, ?, ?, ?, ?)', values) self._db.commit() def purge_old_status(self): """Purge old seen statuses.""" timestamp = int(time.time()) - max_age * 24 * 60 * 60 values = (timestamp, False) cursor = self._db.cursor() cursor.execute('SELECT sid FROM LikedTweets WHERE timestamp < ? AND purged = ?', values) rows = cursor.fetchall() for row in rows: sid = row[0] try: self._api.destroy_favorite(sid) # tweepy.error.TweepError: [{'code': 144, 'message': 'No status found with that ID.'}] except tweepy.error.TweepError as err: pass values = (True, sid) cursor = self._db.cursor() cursor.execute('UPDATE LikedTweets SET purged = ? WHERE sid = ?', values) self._db.commit() print('Purged %d liked Tweet%s!' % (len(rows), '' if len(rows) <= 1 else 's'))