本篇笔记主要分析faiss code下的python接口文件——faiss.py的工作流程以及内容。
在faiss code 编译完成后,在python目录下执行make/make install命令后,会在该文件下产生faiss.py, faiss/, faiss.egg-info/等文件和目录,其中faiss.py和faiss/目录下的__init__.py内容一样,但是在应用程序中import faiss时,导入的是__init__.py文件。
faiss.py在导入时,会执行以下工作:
导入依赖模块 import numpy as np import sys import inspect import pdb import platform import subprocess import logging 创建一个log文件,这里__name__是"faiss" logger = logging.getLogger(__name__) 根据指令集加载swigfaiss包 try: instr_set = instruction_set() if instr_set == "AVX2": logger.info("Loading faiss with AVX2 support.") from .swigfaiss_avx2 import * else: logger.info("Loading faiss.") from .swigfaiss import * except ImportError: # we import * so that the symbol X can be accessed as faiss.X logger.info("Loading faiss.") from .swigfaiss import *在instruction_set方法中会调用platform.system()和subprocess.check_output来获取当前平台的系统信息和CPU的指令集信息。我使用的平台返回的是Linux和"AVX2"。
记录版本信息 __version__ = "%d.%d.%d" % (FAISS_VERSION_MAJOR, FAISS_VERSION_MINOR, FAISS_VERSION_PATCH)上述信息定义在Index.h文件中,我使用的版本是1.6.1
替换clustering的train方法 handle_Clustering() def handle_Clustering(): def replacement_train(self, x, index): assert x.flags.contiguous n, d = x.shape assert d == self.d self.train_c(n, swig_ptr(x), index) replace_method(Clustering, 'train', replacement_train) def replace_method(the_class, name, replacement, ignore_missing=False): try: orig_method = getattr(the_class, name) except AttributeError: if ignore_missing: return raise if orig_method.__name__ == 'replacement_' + name: # replacement was done in parent class return setattr(the_class, name + '_c', orig_method) setattr(the_class, name, replacement)将clustering class中的train方法名称替换为replacement的名称。 replace_method方法中:
the_class:类名,文件中直接使用的类定义在swigfaiss.py中,即这里能调用的类都必须是在该文件中有定义的。name: 函数名,即要重新替换的index的方法replacement: 替换后的方法,这里传入方法的地址,在faiss.py中针对index的部分函数如train, add等定义了一套python下的方法,主要是添加了"replacement_"前缀。所以,在应用程序中调用上述方法时会运行到替换后的方法中来。如index.add --> replacement_add 替换 ProductQuantizer 和 ScalarQuantizer 的方法 handle_Quantizer(ProductQuantizer) handle_Quantizer(ScalarQuantizer) def handle_Quantizer(the_class): def replacement_train(self, x): n, d = x.shape assert d == self.d self.train_c(n, swig_ptr(x)) def replacement_compute_codes(self, x): n, d = x.shape assert d == self.d codes = np.empty((n, self.code_size), dtype='uint8') self.compute_codes_c(swig_ptr(x), swig_ptr(codes), n) return codes def replacement_decode(self, codes): n, cs = codes.shape assert cs == self.code_size x = np.empty((n, self.d), dtype='float32') self.decode_c(swig_ptr(codes), swig_ptr(x), n) return x replace_method(the_class, 'train', replacement_train) replace_method(the_class, 'compute_codes', replacement_compute_codes) replace_method(the_class, 'decode', replacement_decode)这两个类是量化器相关的,这里会替换三个函数: train, compute_codes, decode。
替换MatrixStats的初始化方法 def handle_MatrixStats(the_class): original_init = the_class.__init__ def replacement_init(self, m): assert len(m.shape) == 2 original_init(self, m.shape[0], m.shape[1], swig_ptr(m)) the_class.__init__ = replacement_init handle_MatrixStats(MatrixStats)MatrixStats用于报告数据集相关的一些统计信息并对其进行注释。
替换已经导入的index的部分方法 this_module = sys.modules[__name__] for symbol in dir(this_module): obj = getattr(this_module, symbol) # print symbol, isinstance(obj, (type, types.ClassType)) if inspect.isclass(obj): the_class = obj if issubclass(the_class, Index): handle_Index(the_class) if issubclass(the_class, IndexBinary): handle_IndexBinary(the_class) if issubclass(the_class, VectorTransform): handle_VectorTransform(the_class) if issubclass(the_class, AutoTuneCriterion): handle_AutoTuneCriterion(the_class) if issubclass(the_class, ParameterSpace): handle_ParameterSpace(the_class)首先sys.modules返回所有已经导入的faiss相关的module,然后根据symbol名称判断是否为class,如果是,则根据该class的父类类型依次替换部分方法,具体的替换过程在handle_xxx中实现。
对类或方法添加参数引用 add_ref_in_constructor(IndexIVFFlat, 0) add_ref_in_constructor(IndexIVFFlatDedup, 0) add_ref_in_constructor(IndexPreTransform, {2: [0, 1], 1: [0]}) add_ref_in_method(IndexPreTransform, 'prepend_transform', 0) add_ref_in_constructor(IndexIVFPQ, 0) add_ref_in_constructor(IndexIVFPQR, 0) add_ref_in_constructor(Index2Layer, 0) add_ref_in_constructor(Level1Quantizer, 0) add_ref_in_constructor(IndexIVFScalarQuantizer, 0) add_ref_in_constructor(IndexIDMap, 0) add_ref_in_constructor(IndexIDMap2, 0) add_ref_in_constructor(IndexHNSW, 0) add_ref_in_method(IndexShards, 'add_shard', 0) add_ref_in_method(IndexBinaryShards, 'add_shard', 0) add_ref_in_constructor(IndexRefineFlat, 0) add_ref_in_constructor(IndexBinaryIVF, 0) add_ref_in_constructor(IndexBinaryFromFloat, 0) add_ref_in_constructor(IndexBinaryIDMap, 0) add_ref_in_constructor(IndexBinaryIDMap2, 0) add_ref_in_method(IndexReplicas, 'addIndex', 0) add_ref_in_method(IndexBinaryReplicas, 'addIndex', 0) # seems really marginal... # remove_ref_from_method(IndexReplicas, 'removeIndex', 0) if hasattr(this_module, 'GpuIndexFlat'): # handle all the GPUResources refs add_ref_in_function('index_cpu_to_gpu', 0) add_ref_in_constructor(GpuIndexFlat, 0) add_ref_in_constructor(GpuIndexFlatIP, 0) add_ref_in_constructor(GpuIndexFlatL2, 0) add_ref_in_constructor(GpuIndexIVFFlat, 0) add_ref_in_constructor(GpuIndexIVFScalarQuantizer, 0) add_ref_in_constructor(GpuIndexIVFPQ, 0) add_ref_in_constructor(GpuIndexBinaryFlat, 0) add_ref_in_constructor():对arg1的类添加arg2的参数引用add_ref_in_method():对arg1类中arg2的方法添加arg3的参数引用add_ref_in_function():对arg1的函数添加arg2的参数引用,其中arg1的函数是独立于类的函数。 替换MapLong2Long的方法 replace_method(MapLong2Long, 'add', replacement_map_add) replace_method(MapLong2Long, 'search_multiple', replacement_map_search_multiple)faiss.py除了在import 时会自动执行上述操作,在文件中还定义了一些独立的函数供用户应用程序调用。
index_cpu_to_gpu_multiple_py() 此方法的作用是将index从CPU中拷贝到所有的GPU中,为GPU索引和资源构建c++向量。
方法调用了swigfaiss.py的index_cpu_to_gpu_multiple(),该函数拷贝操作的具体实现,初始定义在gpu/GpuCloner.cpp中。
def index_cpu_to_gpu_multiple_py(resources, index, co=None): """builds the C++ vectors for the GPU indices and the resources. Handles the common case where the resources are assigned to the first len(resources) GPUs""" vres = GpuResourcesVector() vdev = IntVector() for i, res in enumerate(resources): vdev.push_back(i) vres.push_back(res) index = index_cpu_to_gpu_multiple(vres, vdev, index, co) index.referenced_objects = resources return index resources:资源列表index: CPU中索引实例return index: GPU中的索引实例index_cpu_to_all_gpus() 此方法的作用与同样是拷贝index,但是接口更简单,更易于应用程序调用,其内部通过调用index_cpu_to_gpu_multiple_py实现。
def index_cpu_to_all_gpus(index, co=None, ngpu=-1): if ngpu == -1: ngpu = get_num_gpus() res = [StandardGpuResources() for i in range(ngpu)] index2 = index_cpu_to_gpu_multiple_py(res, index, co) return index2 index: 要拷贝的CPU 索引实例co: 拷贝的参数,默认为0ngpu: gpu个数,默认为-1,此时由系统判断 index2: 拷贝后GPU中的索引实例vector_to_array() 将c++的std::vector容器转换成np的数组
def vector_to_array(v): """ convert a C++ vector to a numpy array """ classname = v.__class__.__name__ assert classname.endswith('Vector') dtype = np.dtype(vector_name_map[classname[:-6]]) a = np.empty(v.size(), dtype=dtype) if v.size() > 0: memcpy(swig_ptr(a), v.data(), a.nbytes) return a # 其中vector_name_map表示了std::vector和python中数据类型的映射: vector_name_map = { 'Float': 'float32', 'Byte': 'uint8', 'Char': 'int8', 'Uint64': 'uint64', 'Long': 'int64', 'Int': 'int32', 'Double': 'float64' } v: 要转换的c++类型的数组a: 转换后的numpy风格的列表vector_float_to_array() 同vector_to_array()
def vector_float_to_array(v): return vector_to_array(v)copy_array_to_vector() 将numpy的列表拷贝到c++的容器中
def copy_array_to_vector(a, v): """ copy a numpy array to a vector """ n, = a.shape classname = v.__class__.__name__ assert classname.endswith('Vector') dtype = np.dtype(vector_name_map[classname[:-6]]) assert dtype == a.dtype, ( 'cannot copy a %s array to a %s (should be %s)' % ( a.dtype, classname, dtype)) v.resize(n) if n > 0: memcpy(v.data(), swig_ptr(a), a.nbytes)kmin(array, k) 返回数组array中每一行的k个最小值
array: 要搜索的二维数组k: 要查找的最小值的个数返回值:
D : 返回<m, k>的二维数组,类型为intI : 返回<m, k>的二维数组,类型为float32kmax(array, k) 返回数组array中每一行的k个最大值,其他同kmin()
pairwise_distances(xq, xb, mt=METRIC_L2, metric_arg=0) 计算两组向量之间的整个成对距离矩阵
xq: 第一个二维向量xb: 第二个二维向量mt: 计算的距离类型,默认为L2metric_arg: 距离类型参数返回值,dis: 一个<m1, m2>的二维向量,其中m1为xq的行数,m2为xb的行数
rand/randint/randn
rand: 产生浮点型的随机数randint: 产生int64型随机数randn: 从正态分布中产生folat32型随机数eval_intersection(I1, I2) 两个结果表的每行之间相交的大小 normalize_L2(x)
def normalize_L2(x): fvec_renorm_L2(x.shape[1], x.shape[0], swig_ptr(x))serialize_index() 将索引转换成uint8型数组
def serialize_index(index): """ convert an index to a numpy uint8 array """ writer = VectorIOWriter() write_index(index, writer) return vector_to_array(writer.data)deserialize_index() 使用数组生成索引
def deserialize_index(data): reader = VectorIOReader() copy_array_to_vector(data, reader.data) return read_index(reader)