diff --git a/setup.py b/setup.py index a571ce8..ff8f018 100644 --- a/setup.py +++ b/setup.py @@ -62,7 +62,8 @@ def _setup(): [ 'src/embedded_jubatus.pyx', 'src/_wrapper.cpp', - 'src/_model.cpp' + 'src/_model.cpp', + 'src/_helper.cpp' ], include_dirs=include_dirs, libraries=['jubatus_core', 'jubatus_util_text'], diff --git a/src/_helper.cpp b/src/_helper.cpp new file mode 100644 index 0000000..e9345bb --- /dev/null +++ b/src/_helper.cpp @@ -0,0 +1,71 @@ +#include +#include +#include "_helper.h" + +using jubatus::util::lang::lexical_cast; + +static std::vector cache; +static jubatus::util::concurrent::rw_mutex mutex; + +void allocate_number_string(std::size_t max_num) { + jubatus::util::concurrent::scoped_wlock lk(mutex); + for (std::size_t i = cache.size(); i <= max_num; ++i) + cache.push_back(lexical_cast(i)); +} + +const std::string& get_number_string(std::size_t num) { + { + jubatus::util::concurrent::scoped_rlock lk(mutex); + if (cache.size() > num) + return cache[num]; + } + { + jubatus::util::concurrent::scoped_wlock lk(mutex); + for (std::size_t i = cache.size(); i <= num; ++i) + cache.push_back(lexical_cast(i)); + return cache[num]; + } +} + +const std::string& get_number_string_fast(std::size_t num) { + jubatus::util::concurrent::scoped_rlock lk(mutex); + return cache[num]; +} + +void ndarray_to_datum(const PyObject *obj, size_t i, datum& out) { + const PyArrayObject *ary = (const PyArrayObject*)obj; + const char *p = PyArray_BYTES(ary) + (i * PyArray_STRIDE(ary, 0)); + npy_intp stride = PyArray_STRIDE(ary, 1); + + out.string_values_.clear(); + out.num_values_.clear(); + out.binary_values_.clear(); + + jubatus::util::concurrent::scoped_rlock lk(mutex); + for (npy_intp j = 0; j < PyArray_DIM(ary, 1); j ++) { + double v = *(double*)(p + j * stride); + if (v != 0.0) + out.num_values_.push_back(std::make_pair(cache[j], v)); + } +} + +void csr_to_datum(const PyObject *obj0, const PyObject *obj1, + const PyObject *obj2, size_t i, datum& out) { + const PyArrayObject *data = (const PyArrayObject*)obj0; + const PyArrayObject *indices = (const PyArrayObject*)obj1; + const PyArrayObject *indptr = (const PyArrayObject*)obj2; + + out.string_values_.clear(); + out.num_values_.clear(); + out.binary_values_.clear(); + + int j = *(int32_t*)PyArray_GETPTR1(indptr, i); + int k = *(int32_t*)PyArray_GETPTR1(indptr, i + 1); + + jubatus::util::concurrent::scoped_rlock lk(mutex); + for (npy_intp l = j; l < k; l ++) { + int32_t x = *(int32_t*)PyArray_GETPTR1(indices, l); + double v = *(double*)PyArray_GETPTR1(data, l); + out.num_values_.push_back(std::make_pair(cache[x], v)); + } +} diff --git a/src/_helper.h b/src/_helper.h new file mode 100644 index 0000000..bc65065 --- /dev/null +++ b/src/_helper.h @@ -0,0 +1,13 @@ +#include + +#include +#include + +using jubatus::core::fv_converter::datum; + +void allocate_number_string(std::size_t); +const std::string& get_number_string(std::size_t); +const std::string& get_number_string_fast(std::size_t); + +void ndarray_to_datum(const PyObject*, std::size_t, datum&); +void csr_to_datum(const PyObject*, const PyObject*, const PyObject*, std::size_t, datum&); diff --git a/src/_helper.pxd b/src/_helper.pxd new file mode 100644 index 0000000..d5a4952 --- /dev/null +++ b/src/_helper.pxd @@ -0,0 +1,13 @@ +from cpython.ref cimport PyObject +from libcpp.string cimport string + +from _wrapper cimport datum + +cdef extern from '_helper.h': + cdef void ndarray_to_datum(const PyObject* obj, size_t i, datum&) except + + cdef void csr_to_datum(const PyObject*, const PyObject*, const PyObject*, size_t, datum&) except + + +cdef extern from '_helper.h' nogil: + cdef void allocate_number_string(size_t max_num) except + + cdef const string& get_number_string(size_t num) except + + cdef const string& get_number_string_fast(size_t num) except + diff --git a/src/anomaly.pyx b/src/anomaly.pyx index 428c6df..d06a69f 100644 --- a/src/anomaly.pyx +++ b/src/anomaly.pyx @@ -69,43 +69,38 @@ cdef class Anomaly(_JubatusBase): self.partial_fit(X) def partial_fit(self, X): - import numpy as np cdef datum d - cdef vector[string] cache - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - cdef rows + cdef int is_ndarray = check_ndarray_csr_type(X) + cdef unsigned int i, rows = X.shape[0] cdef vector[pair[string, datum]] vec - if not (is_ndarray or is_csr): - raise ValueError - rows = X.shape[0] + allocate_number_string(X.shape[1]) if is_ndarray: for i in range(rows): - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) vec.push_back(pair[string, datum](self._handle.get_next_id(), d)) else: for i in range(rows): - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) vec.push_back(pair[string, datum](self._handle.get_next_id(), d)) self._handle.add_bulk(vec) def decision_function(self, X): import numpy as np cdef datum d - cdef vector[string] cache - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - cdef rows - if not (is_ndarray or is_csr): - raise ValueError - rows = X.shape[0] + cdef int is_ndarray = check_ndarray_csr_type(X) + cdef unsigned int i, rows = X.shape[0] + allocate_number_string(X.shape[1]) y = np.zeros((rows,)) if is_ndarray: for i in range(rows): - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) y[i] = self._handle.calc_score(d) else: for i in range(rows): - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) y[i] = self._handle.calc_score(d) return y diff --git a/src/classifier.pyx b/src/classifier.pyx index 875cdb5..db5f38e 100644 --- a/src/classifier.pyx +++ b/src/classifier.pyx @@ -87,53 +87,47 @@ cdef class Classifier(_JubatusBase): def partial_fit(self, X, y): import numpy as np - if len(X.shape) != 2 or len(y.shape) != 1 or X.shape[0] != y.shape[0]: - raise ValueError('invalid shape') - cdef int i, j, max_label - cdef rows = X.shape[0] + cdef unsigned int max_label, i, rows = X.shape[0] cdef datum d - cdef vector[string] cache - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - if not (is_ndarray or is_csr): - raise ValueError + cdef int is_ndarray = check_ndarray_csr_type(X) + if len(y.shape) != 1 or X.shape[0] != y.shape[0]: + raise ValueError('invalid shape') + if y.dtype.kind not in ('i', 'u'): + raise ValueError('invalid y.dtype') if self._classes is None: self._classes = [] - max_label = len(self._classes) - 1 + max_label = max(len(self._classes) - 1, max(y)) + allocate_number_string(max(X.shape[1], max_label)) for i in range(rows): if is_ndarray: - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) else: - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) - for j in range(cache.size(), y[i] + 1): - cache.push_back(lexical_cast[string, int](j)) - if max_label < y[i]: - max_label = y[i] - self._handle.train(cache[y[i]], d) - for j in range(len(self._classes), max_label + 1): - self._classes.append(j) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) + self._handle.train(get_number_string_fast(y[i]), d) + for i in range(len(self._classes), max_label + 1): + self._classes.append(i) return self def decision_function(self, X): import numpy as np - if len(X.shape) != 2: - raise ValueError('invalid X.shape') - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - if not (is_ndarray or is_csr): - raise ValueError - cdef int i, j, k + cdef int is_ndarray = check_ndarray_csr_type(X) + cdef int k + cdef size_t j cdef double score cdef datum d - cdef vector[string] cache cdef vector[classify_result_elem] r - cdef int rows = X.shape[0] + cdef unsigned int i, rows = X.shape[0] ret = None + allocate_number_string(X.shape[1]) for i in range(rows): if is_ndarray: - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) else: - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) r = self._handle.classify(d) if r.size() == 0: return np.zeros(rows) @@ -160,25 +154,21 @@ cdef class Classifier(_JubatusBase): def predict(self, X): import numpy as np - if len(X.shape) != 2: - raise ValueError('invalid X.shape') - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - if not (is_ndarray or is_csr): - raise ValueError - cdef int i, j, max_j + cdef int is_ndarray = check_ndarray_csr_type(X) + cdef size_t j, max_j cdef double max_score cdef datum d - cdef vector[string] cache cdef vector[classify_result_elem] r - cdef int rows = X.shape[0] + cdef unsigned int i, rows = X.shape[0] cdef c_np.ndarray[c_np.int32_t, ndim=1] ret = np.zeros(rows, dtype=np.int32) - + allocate_number_string(X.shape[1]) for i in range(rows): if is_ndarray: - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) else: - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) r = self._handle.classify(d) if r.size() == 0: break diff --git a/src/embedded_jubatus.pyx b/src/embedded_jubatus.pyx index 3889061..1055b22 100644 --- a/src/embedded_jubatus.pyx +++ b/src/embedded_jubatus.pyx @@ -5,6 +5,7 @@ from libcpp.pair cimport pair from libcpp.map cimport map from cython.operator cimport dereference from cython.operator cimport preincrement +from cpython.ref cimport PyObject cimport numpy as c_np @@ -33,6 +34,11 @@ from _wrapper cimport node_info from _wrapper cimport edge_info from _wrapper cimport preset_query from _wrapper cimport indexed_point +from _helper cimport allocate_number_string +from _helper cimport get_number_string +from _helper cimport get_number_string_fast +from _helper cimport ndarray_to_datum +from _helper cimport csr_to_datum from jubatus.anomaly.types import IdWithScore as AnomalyIdWithScore from jubatus.bandit.types import ArmInfo diff --git a/src/regression.pyx b/src/regression.pyx index 69a8d8c..5e6068d 100644 --- a/src/regression.pyx +++ b/src/regression.pyx @@ -61,44 +61,37 @@ cdef class Regression(_JubatusBase): return self.partial_fit(X, y) def partial_fit(self, X, y): - import numpy as np - if len(X.shape) != 2 or len(y.shape) != 1 or X.shape[0] != y.shape[0]: - raise ValueError('invalid shape') - cdef int i cdef double score - cdef rows = X.shape[0] cdef datum d - cdef vector[string] cache - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - if not (is_ndarray or is_csr): - raise ValueError + cdef int is_ndarray = check_ndarray_csr_type(X) + cdef unsigned int i, rows = X.shape[0] + if len(y.shape) != 1 or X.shape[0] != y.shape[0]: + raise ValueError('invalid shape') + allocate_number_string(X.shape[1]) for i in range(rows): if is_ndarray: - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) else: - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) score = y[i] self._handle.train(score, d) return self def predict(self, X): import numpy as np - if len(X.shape) != 2: - raise ValueError('invalid shape') - cdef int i - cdef rows = X.shape[0] cdef datum d - cdef vector[string] cache - cdef int is_ndarray = isinstance(X, np.ndarray) - cdef int is_csr = (type(X).__name__ == 'csr_matrix') - if not (is_ndarray or is_csr): - raise ValueError + cdef int is_ndarray = check_ndarray_csr_type(X) + cdef unsigned int i, rows = X.shape[0] + allocate_number_string(X.shape[1]) ret = np.zeros([rows], dtype=np.float32) for i in range(rows): if is_ndarray: - ndarray_to_datum(X, i, d, cache) + ndarray_to_datum(X, i, d) else: - csr_to_datum(X.data, X.indices, X.indptr, i, d, cache) + csr_to_datum(X.data, + X.indices, + X.indptr, i, d) ret[i] = self._handle.estimate(d) return ret diff --git a/src/types.pyx b/src/types.pyx index 2b19197..6363561 100644 --- a/src/types.pyx +++ b/src/types.pyx @@ -29,34 +29,6 @@ cdef datum_native2py(datum& d): ret.add_binary(k, v) return ret -cdef ndarray_to_datum(c_np.ndarray[c_np.float64_t, ndim=2] X, int i, datum& d, vector[string]& cache): - d.string_values_.clear() - d.num_values_.clear() - d.binary_values_.clear() - - cdef int j - for j in range(cache.size(), X.shape[1]): - cache.push_back(lexical_cast[string, int](j)) - for j in range(X.shape[1]): - if X[i, j] != 0.0: - d.num_values_.push_back(pair[string, double](cache[j], X[i, j])) - -cdef csr_to_datum(c_np.ndarray[c_np.float64_t, ndim=1] data, - c_np.ndarray[c_np.int32_t, ndim=1] indices, - c_np.ndarray[c_np.int32_t, ndim=1] indptr, - int i, datum& d, vector[string]& cache): - d.string_values_.clear() - d.num_values_.clear() - d.binary_values_.clear() - - cdef int j = indptr[i] - cdef int k = indptr[i + 1] - cdef int l, m - for l in range(j, k): - for m in range(cache.size(), indices[l] + 1): - cache.push_back(lexical_cast[string, int](m)) - d.num_values_.push_back(pair[string, double](cache[indices[l]], data[l])) - cdef props_py2native(p, prop_t& out): for k, v in p.items(): out.insert(pair[string, string](k.encode('utf8'), v.encode('utf8'))) @@ -80,3 +52,20 @@ cdef preset_query_py2native(query, preset_query& q): for x in query.node_query: q.node_query.push_back(pair[string, string]( x.from_id.encode('ascii'), x.to_id.encode('ascii'))) + +def check_ndarray_csr_type(X): + import numpy as np + cdef int is_ndarray = isinstance(X, np.ndarray) + cdef int is_csr = (type(X).__name__ == 'csr_matrix') + if not (is_ndarray or is_csr): + raise ValueError + if len(X.shape) != 2: + raise ValueError('invalid X.shape') + if X.dtype != np.float64: + raise ValueError('X.dtype must be float64') + if is_csr: + if X.indices.dtype != np.int32: + raise ValueError('X.indices.dtype must be int32') + if X.indptr.dtype != np.int32: + raise ValueError('X.indptr.dtype must be int32') + return is_ndarray