# -*- coding: utf-8 -*- import calendar import difflib import os import shutil import time from code import CodeMetrics from git import Repo class RepositoryChecker: """Browse a Git repository.""" def __init__(self, url, year): """Initialize the analysis of a Git repository.""" self._repo = Repo(url) self._dump_path = url + '_dump' self._start = calendar.timegm(time.strptime('1/1/%u' % year, '%d/%m/%Y')) self._end = calendar.timegm(time.strptime('1/1/%u' % (year + 1), '%d/%m/%Y')) self._timeline = {} self._last_metrics = None self._insertions = 0 self._deletions = 0 self._added = 0 self._removed = 0 self._old = 0 self._new = 0 self._killed = 0 def __str__(self): """Provide a pretty print of the checker.""" desc = '' desc += '=== Code ===\n' desc += 'Insertions: %u\n' % self._insertions desc += 'Deletions: %u\n' % self._deletions desc += '\n' desc += '=== Files ===\n' desc += 'Added: %u\n' % self._added desc += 'Removed: %u\n' % self._removed desc += '\n' desc += '=== TODO/FIXME ===\n' desc += 'Old: %u\n' % self._old desc += 'New: %u\n' % self._new desc += 'Killed: %u\n' % self._killed return desc def process(self): """Collect Git info.""" selection = list(self._repo.iter_commits('master')) count = 0 previous = None for commit in list(selection): valid = self._start <= commit.committed_date and commit.committed_date < self._end if not(previous or valid): continue elif previous and not(valid): break previous = commit count += 1 i = 0 previous = None for commit in list(selection): valid = self._start <= commit.committed_date and commit.committed_date < self._end if not(previous or valid): continue elif previous and not(valid): break previous = commit print('\r[+] Collecting Git info... %d%%' % ((i * 100) / count), end='') # Insertions / deletions total = commit.stats.total self._insertions += total.get('insertions', 0) self._deletions += total.get('deletions', 0) # The run command is : # # git diff-tree SHA SHA~1 -r --abbrev=40 --full-index -M --raw --no-color # # Beware: all is reversed! for diff in commit.diff(commit.hexsha + '~1', create_patch=True, ignore_blank_lines=True, ignore_space_at_eol=True, diff_filter='cr'): # Added / removed if diff.new_file: self._removed += 1 if diff.deleted_file: self._added += 1 # TODO / FIXME / REMME blob_a = None blob_b = None try: if diff.a_blob: blob_a = diff.a_blob.data_stream.read().decode('utf-8').splitlines(1) if diff.b_blob: blob_b = diff.b_blob.data_stream.read().decode('utf-8').splitlines(1) except UnicodeDecodeError: pass if blob_a is None and blob_b is None: # Binary file pass elif blob_a is None: for line in blob_b: if 'TODO' in line or 'FIXME' in line or 'REMME' in line: self._killed += 1 elif blob_b is None: for line in blob_a: if 'TODO' in line or 'FIXME' in line or 'REMME' in line: self._new += 1 else: for line in difflib.unified_diff(blob_a, blob_b): if line.startswith('+++') or line.startswith('---'): continue if 'TODO' in line or 'FIXME' in line or 'REMME' in line: if line.startswith('-'): self._new += 1 elif line.startswith('+'): self._killed += 1 # Single lines of code progress = '\r[+] Collecting Git info... %d%%' % ((i * 100) / count) self._delete_dump() msg = self._build_dump(commit.tree, progress) print('\r' + ' ' * len(msg[1:]), end='') print(progress, end='') cm = CodeMetrics(self._dump_path) msg = cm.process(progress) print('\r' + ' ' * len(msg[1:]), end='') print(progress, end='') self._timeline[commit.committed_date] = cm.count_all_lines() if self._last_metrics is None: self._last_metrics = cm # All remaining TODO / FIXME msg = self._grep_for_toto_fixme(commit.tree, progress) print('\r' + ' ' * len(msg[1:]), end='') print(progress, end='') i += 1 print('\r[+] Collecting Git info... %d%%' % ((i * 100) / count)) def _delete_dump(self): """Delete all dumped items.""" if os.path.exists(self._dump_path): shutil.rmtree(self._dump_path) def _build_dump(self, tree, msg): """Dump all items from a commit tree.""" if not os.path.exists(self._dump_path): os.makedirs(self._dump_path) count = len(list(tree.traverse())) i = 0 for item in tree.traverse(): print(msg + ' -> Dumping items... %d%%' % ((i * 100) / count), end='') path = os.path.join(self._dump_path, item.path) if item.type == 'tree': if not os.path.exists(path): os.makedirs(path) elif item.type == 'blob': with open(path, 'wb') as out: out.write(item.data_stream.read()) i += 1 msg = msg + ' -> Dumping items... %d%%' % ((i * 100) / count) print(msg, end='') return msg def _grep_for_toto_fixme(self, tree, msg): """Find all waiting TODO / FIXME markers.""" count = len(list(tree.traverse())) i = 0 for item in tree.traverse(): print(msg + ' -> Searching for markers... %d%%' % ((i * 100) / count), end='') if item.type == 'blob': blob = None try: blob = item.data_stream.read().decode('utf-8').splitlines(1) except UnicodeDecodeError: pass if blob: for line in blob: if 'TODO' in line or 'FIXME' in line or 'REMME' in line: self._old += 1 i += 1 msg = msg + ' -> Searching for markers... %d%%' % ((i * 100) / count) print(msg, end='') return msg def get(self, name): """Provide a memorized property.""" return getattr(self, '_' + name)