Code: Alles auswählen
import os
import re
from copy import copy
from subprocess import Popen, PIPE
from json import dumps, loads
from collections import Counter
from itertools import imap
from operator import itemgetter, gt, lt, eq
NULL = open(os.devnull, 'w')
REX_FILTER = re.compile('([+-]*)(\d+)')
SIGNS = {'+': gt, '-': lt, '': eq}
CALL = ['find', None, '-xdev', '-type', 'f',
'-links', None, '-printf', '%i: %n: %p\n']
class Hardlinks(object):
"""
Searches `path` for hardlinks with number `filter`. Filter is a string
holding a number or a number with a plus or minus sign (+ for greater than,
- for lesser than).
Use `load` to load values from another serialized Hardlinks object.
"""
def __init__(self, path='/', filter='+1', load=None):
if load:
self.loads(load)
else:
self.call = copy(CALL)
self.call[1] = path
self.call[6] = filter
self.inode2paths = {}
self.path2inode = {}
self._run()
def _run(self):
p = Popen(self.call, stdout=PIPE, stderr=NULL)
for line in p.stdout:
inode, links, path = line.rstrip().split(': ')
inode = int(inode)
links = int(links)
path = os.path.abspath(path)
self.inode2paths.setdefault(inode, (links, []))[1].append(path)
self.path2inode[path] = inode
@property
def path(self):
"""
Returns the used `path` argument.
"""
return self.call[1]
@property
def filter(self):
"""
Returns the used `filter` argument.
"""
return self.call[6]
@property
def count_hardlinks(self):
"""
Number of inodes matching the filter in path.
"""
return len(self.inode2paths)
@property
def count_files(self):
"""
Number of hardlinked files matching the filter in path.
"""
return len(self.path2inode)
@property
def distribution(self):
"""
Distribution of number of hardlinks in path.
"""
return Counter(imap(itemgetter(0), self.inode2paths.values()))
def test_path(self, path):
"""
Returns a list of hardlinked paths of `path`.
"""
path = os.path.abspath(path)
try:
return self.inode2paths[self.path2inode[path]]
except KeyError:
return []
def dumps(self):
"""
Serializes the internal data for later usage.
"""
return dumps((self.call, self.inode2paths, self.path2inode))
def loads(self, s):
"""
Loads internal data from `s`.
"""
self.call, self.inode2paths, self.path2inode = loads(s)
def get_paths(self, filter='+1'):
"""
Returns number of hardlinks and pathlists for `filter`.
This uses the internal cached data to avoid rescanning of the
filesystem.
Note: If the number of hardlinks does not match the length of
pathlist you missed a file outside the search path.
"""
sign, num = re.match(REX_FILTER, filter).groups()
num = int(num)
compare = SIGNS[sign]
for count, paths in self.inode2paths.values():
if compare(count, num):
yield count, paths