Web · Wiki · Activities · Blog · Lists · Chat · Meeting · Bugs · Git · Translate · Archive · People · Donate
1
#
2
# Author: Sascha Silbe <sascha-pgp@silbe.org>
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License version 3
6
# as published by the Free Software Foundation.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15
"""
16
Gdatastore metadata index interface
17
"""
18
19
import logging
20
import os
21
import sys
22
import time
23
24
import xapian
25
from xapian import Document, Enquire, Query, WritableDatabase
26
27
28
_CURRENT_VERSION = 1
29
_STANDARD_TERMS = {
30
    'activity': {'prefix': 'Xactname', 'type': str},
31
    'activity_id': {'prefix': 'Xactid', 'type': str},
32
    'description': {'prefix': 'Xdesc', 'type': unicode},
33
    'keep': {'prefix': 'Xkeep', 'type': str},
34
    'mime_type': {'prefix': 'T', 'type': str},
35
    'tags': {'prefix': 'K', 'type': unicode},
36
    'title': {'prefix': 'S', 'type': unicode},
37
    'tree_id': {'prefix': 'Xtree', 'type': str},
38
    'version_id': {'prefix': 'Xversion', 'type': str},
39
}
40
_VALUE_TREE_ID = 0
41
_VALUE_VERSION_ID = 1
42
_VALUE_MTIME = 2
43
_VALUE_SIZE = 3
44
_VALUE_CTIME = 4
45
_VALUE_COMMIT_ID = 5
46
_STANDARD_VALUES = {
47
    'creation_time': {'number': _VALUE_CTIME, 'type': float},
48
    'filesize': {'number': _VALUE_SIZE, 'type': int},
49
    'timestamp': {'number': _VALUE_MTIME, 'type': float},
50
    'tree_id': {'number': _VALUE_TREE_ID, 'type': str},
51
    'version_id': {'number': _VALUE_VERSION_ID, 'type': str},
52
}
53
_IGNORE_PROPERTIES = ['preview']
54
_PREFIX_FULL_VALUE = 'Xf'
55
_PREFIX_OBJECT_ID = 'Q'
56
_LIMIT_MAX = 2 ** 31 - 1
57
_DOCID_REVERSE_MAP = {True: Enquire.DESCENDING, False: Enquire.ASCENDING}
58
59
60
class DSIndexError(Exception):
61
    pass
62
63
64
class TermGenerator(xapian.TermGenerator):
65
66
    def __init__(self):
67
        self._document = None
68
        xapian.TermGenerator.__init__(self)
69
70
    def index_document(self, document, properties):
71
        for name, info in _STANDARD_VALUES.items():
72
            if name not in properties:
73
                continue
74
75
            document.add_value(info['number'],
76
                _serialise_value(info, properties[name]))
77
78
        self._document = document
79
        self.set_document(document)
80
81
        properties = dict(properties)
82
        self._index_known(properties)
83
        self._index_unknown(properties)
84
85
    def _index_known(self, properties):
86
        """Index standard properties and remove them from the input."""
87
        for name, info in _STANDARD_TERMS.items():
88
            if name not in properties:
89
                continue
90
91
            value = info['type'](properties.pop(name))
92
            self._index_property(value, info['prefix'])
93
94
    def _index_unknown(self, properties):
95
        """
96
        Index all given properties.
97
98
        Expects not to get passed any standard term-stored property.
99
        """
100
        for name, value in properties.items():
101
            if name in _IGNORE_PROPERTIES or name in _STANDARD_VALUES:
102
                continue
103
104
            self._index_property(value, _prefix_for_unknown(name))
105
106
    def _index_property(self, value, prefix):
107
        if isinstance(value, unicode):
108
            value = value.encode('utf-8')
109
        elif not isinstance(value, str):
110
            value = str(value)
111
112
        # Hardcoded Xapian term length limit
113
        if len(prefix + value) < 240:
114
            # We need to add the full value (i.e. not split into words), too,
115
            # so we can enumerate unique values. It also simplifies setting up
116
            # dictionary-based queries.
117
            self._document.add_term(_PREFIX_FULL_VALUE + prefix + value)
118
119
        self.index_text(value, 1, prefix)
120
        self.increase_termpos()
121
122
123
class QueryParser(xapian.QueryParser):
124
    """
125
    QueryParser that understands dictionaries and Xapian query strings.
126
127
    The dictionary may contain property names as keys and basic types
128
    (exact match), 2-tuples (range, only valid for value-stored
129
    standard properties) and lists (multiple exact matches joined with
130
    OR) as values.
131
    An empty dictionary matches everything. Queries from different keys
132
    (i.e. different property names) are joined with AND.
133
134
    Full text search (Xapian query string) is only supported for standard
135
    properties.
136
    """
137
138
    _FLAGS = (xapian.QueryParser.FLAG_PHRASE |
139
              xapian.QueryParser.FLAG_BOOLEAN |
140
              xapian.QueryParser.FLAG_LOVEHATE |
141
              xapian.QueryParser.FLAG_WILDCARD)
142
143
    def __init__(self):
144
        xapian.QueryParser.__init__(self)
145
        for name, info in _STANDARD_TERMS.items():
146
            self.add_prefix(name, info['prefix'])
147
            self.add_prefix('', info['prefix'])
148
        # For compatibility with Sugar 0.84+ data store API (without
149
        # version support).
150
        self.add_prefix('uid', _STANDARD_TERMS['tree_id']['prefix'])
151
152
    def _parse_query_term(self, prefix, value):
153
        if isinstance(value, list):
154
            subqueries = [self._parse_query_term(prefix, word)
155
                          for word in value]
156
            return Query(Query.OP_OR, subqueries)
157
158
        return Query(_PREFIX_FULL_VALUE + prefix + str(value))
159
160
    def _parse_query_value_range(self, info, value):
161
        if len(value) != 2:
162
            raise TypeError('Only tuples of size 2 have a defined meaning.'
163
                            ' Did you mean to pass a list instead?')
164
165
        start, end = value
166
        return Query(Query.OP_VALUE_RANGE, info['number'],
167
                     _serialise_value(info, start),
168
                     _serialise_value(info, end))
169
170
    def _parse_query_value(self, info, value):
171
        if isinstance(value, list):
172
            subqueries = [self._parse_query_value(info, word)
173
                          for word in value]
174
            return Query(Query.OP_OR, subqueries)
175
176
        elif isinstance(value, tuple):
177
            return self._parse_query_value_range(info, value)
178
179
        elif isinstance(value, dict):
180
            # compatibility option for timestamp: {'start': 0, 'end': 1}
181
            start = value.get('start', 0)
182
            end = value.get('end', sys.maxint)
183
            return self._parse_query_value_range(info, (start, end))
184
185
        else:
186
            return self._parse_query_value_range(info, (value, value))
187
188
    def _parse_query_xapian(self, query_str):
189
        return xapian.QueryParser.parse_query(self, query_str,
190
                                              QueryParser._FLAGS, '')
191
192
    def parse_datastore_query(self, query_dict, query_string):
193
        logging.debug('query_dict=%r, query_string=%r', query_dict,
194
                      query_string)
195
        queries = []
196
        query_dict = dict(query_dict or {})
197
198
        if query_string is not None:
199
            queries.append(self._parse_query_xapian(str(query_string)))
200
201
        for name, value in query_dict.items():
202
            if name in _STANDARD_TERMS:
203
                prefix = _STANDARD_TERMS[name]['prefix']
204
                query = self._parse_query_term(prefix, value)
205
            elif name in _STANDARD_VALUES:
206
                info = _STANDARD_VALUES[name]
207
                query = self._parse_query_value(info, value)
208
            else:
209
                prefix = _prefix_for_unknown(name)
210
                query = self._parse_query_term(prefix, value)
211
212
            queries.append(query)
213
214
        if not queries:
215
            queries.append(Query(''))
216
217
        logging.debug('queries: %r', [str(query) for query in queries])
218
        return Query(Query.OP_AND, queries)
219
220
221
class Index(object):
222
223
    def __init__(self, data_stores):
224
        self._data_stores = data_stores
225
        self._database = None
226
        self._base_dir = data_stores[0]['index_dir']
227
228
        if not os.path.exists(self._base_dir):
229
            os.makedirs(self._base_dir)
230
            self._create_database()
231
232
        self._migrate()
233
        self._query_parser = QueryParser()
234
        self._query_parser.set_database(self._database)
235
236
    def close(self):
237
        """Close index database if it is open."""
238
        if not self._database:
239
            return
240
241
        self._database.close()
242
        self._database = None
243
244
    def contains(self, object_id, commit_id=None):
245
        postings = self._database.postlist(_object_id_term(object_id))
246
        try:
247
            doc_id = postings.next().docid
248
        except StopIteration:
249
            return False
250
        if not commit_id:
251
            return True
252
        document = self._database.get_document(doc_id)
253
        return document.get_value(_VALUE_COMMIT_ID) == commit_id
254
255
    def delete(self, object_id):
256
        writable_db = self._get_writable_db()
257
        object_id_term = _object_id_term(object_id)
258
        if __debug__:
259
            enquire = Enquire(self._database)
260
            enquire.set_query(Query(object_id_term))
261
            documents = [hit.document for hit in enquire.get_mset(0, 2, 2)]
262
            assert len(documents) == 1
263
264
        writable_db.delete_document(object_id_term)
265
        writable_db.commit()
266
        self._reopen_dbs()
267
268
    def find(self, query_dict, query_string, options):
269
        offset = options.pop('offset', 0)
270
        limit = options.pop('limit', _LIMIT_MAX)
271
        order_by = options.pop('order_by', ['+timestamp'])[0]
272
        all_versions = options.pop('all_versions', False)
273
        check_at_least = options.pop('check_at_least', offset + limit + 1)
274
275
        enquire = Enquire(self._database)
276
        query = self._query_parser.parse_datastore_query(query_dict,
277
                                                         query_string)
278
        enquire.set_query(query)
279
280
        sort_reverse = {'+': True, '-': False}[order_by[0]]
281
        try:
282
            sort_value_nr = _STANDARD_VALUES[order_by[1:]]['number']
283
        except KeyError:
284
            logging.warning('Trying to order by unknown property: %r',
285
                            order_by[1:])
286
            sort_value_nr = _VALUE_MTIME
287
288
        enquire.set_sort_by_value(sort_value_nr, reverse=sort_reverse)
289
        enquire.set_docid_order(_DOCID_REVERSE_MAP[sort_reverse])
290
291
        if not all_versions:
292
            enquire.set_collapse_key(_VALUE_TREE_ID)
293
294
        if not all_versions and order_by != '+timestamp':
295
            # Xapian doesn't support using a different sort order while
296
            # collapsing (which needs to be timestamp in our case), so
297
            # we need to query everything and sort+limit ourselves.
298
            enquire.set_sort_by_value(_VALUE_MTIME, True)
299
            enquire.set_docid_order(enquire.ASCENDING)
300
            query_result = enquire.get_mset(0, _LIMIT_MAX, _LIMIT_MAX)
301
        else:
302
            logging.debug('Offset/limit using Xapian: %d %d %d', offset, limit, check_at_least)
303
            query_result = enquire.get_mset(offset, limit, check_at_least)
304
305
        total_count = query_result.get_matches_lower_bound()
306
        documents = [hit.document for hit in query_result]
307
308
        if (not all_versions) and (order_by != '+timestamp'):
309
            _sort_documents(documents, sort_value_nr, sort_reverse)
310
            del documents[offset + limit:]
311
312
        #object_ids = [(document.get_value(_VALUE_TREE_ID),
313
        #               document.get_value(_VALUE_VERSION_ID))
314
        #              for document in documents]
315
        entries = [deserialise_metadata(document.get_data())
316
                   for document in documents]
317
318
        return entries, total_count
319
320
    def find_unique_values(self, property):
321
        if property in _STANDARD_TERMS:
322
            prefix = _PREFIX_FULL_VALUE + _STANDARD_TERMS[property]['prefix']
323
        else:
324
            prefix = _PREFIX_FULL_VALUE + _prefix_for_unknown(property)
325
326
        return [term.term[len(prefix):]
327
                for term in self._database.allterms(prefix)]
328
329
    def retrieve(self, object_id):
330
        postings = self._database.postlist(_object_id_term(object_id))
331
        doc_id = postings.next().docid
332
        document = self._database.get_document(doc_id)
333
        # When using multiple databases, Xapian document IDs are
334
        # interleaved:
335
        # global_doc_id = (local_doc_id - 1) * num_databases + db_index + 1
336
        ds_index = (doc_id - 1) % len(self._data_stores)
337
        return {'metadata': deserialise_metadata(document.get_data()),
338
                'data_store': self._data_stores[ds_index],
339
                'commit_id': document.get_value(_VALUE_COMMIT_ID)}
340
341
    def store(self, object_id, properties, commit_id):
342
        logging.debug('store(%r, %r)', object_id, properties)
343
        assert (properties['tree_id'], properties['version_id']) == object_id
344
        id_term = _object_id_term(object_id)
345
        document = Document()
346
        logging.debug('serialised=%r', serialiase_metadata(properties))
347
        document.set_data(serialiase_metadata(properties))
348
        document.add_term(id_term)
349
        term_generator = TermGenerator()
350
        term_generator.index_document(document, properties)
351
        assert (document.get_value(_VALUE_TREE_ID), document.get_value(_VALUE_VERSION_ID)) == object_id
352
        document.add_value(_VALUE_COMMIT_ID, commit_id)
353
        writable_db = self._get_writable_db()
354
        writable_db.replace_document(id_term, document)
355
        writable_db.commit()
356
        self._reopen_dbs()
357
358
    def _create_database(self):
359
        database = WritableDatabase(self._base_dir, xapian.DB_CREATE_OR_OPEN)
360
        database.set_metadata('gdatastore_version', str(_CURRENT_VERSION))
361
        database.close()
362
363
    def _migrate(self):
364
        for try_count in range(10):
365
            try:
366
                self._database = WritableDatabase(self._base_dir,
367
                                                  xapian.DB_CREATE_OR_OPEN)
368
            except xapian.DatabaseLockError:
369
                logging.error("Couldn't lock Xapian database (try #%d)",
370
                              try_count)
371
                time.sleep(1)
372
            else:
373
                break
374
375
        version = int(self._database.get_metadata('gdatastore_version'))
376
377
        if version > _CURRENT_VERSION:
378
            raise DSIndexError('Unsupported index version: %d > %d' %
379
                             (version, _CURRENT_VERSION))
380
381
        self._reopen_dbs()
382
383
    def _get_writable_db(self):
384
        """Return a writable database instance
385
386
        In regular single-database mode, just return the single database
387
        instance. In multi-database mode, return a new writable database
388
        instance.
389
        """
390
        if len(self._data_stores) == 1:
391
            return self._database
392
        return WritableDatabase(self._base_dir, xapian.DB_OPEN)
393
394
    def _reopen_dbs(self):
395
        """Reopen databases if necessary
396
397
        In regular single-database, do nothing. In multi-database mode,
398
        re-open all databases to ensure we use the latest versions of all
399
        databases.
400
401
        Invoke this when either we have updated the primary database
402
        ourselves or the additional databases may have been updated by
403
        some other process.
404
        """
405
        if len(self._data_stores) == 1:
406
            return
407
        self._database = xapian.Database(self._base_dir)
408
        for db_info in self._data_stores[1:]:
409
            db = xapian.Database(db_info['index_dir'])
410
            version = int(db.get_metadata('gdatastore_version'))
411
            if version != _CURRENT_VERSION:
412
                logging.warning('Skipping extra index %r due to version'
413
                                ' mismatch', db_info['index_dir'])
414
                continue
415
416
            self._database.add_database(db)
417
418
419
def deserialise_metadata(serialised):
420
    """Deserialise a string generated by serialise_metadata().
421
422
    Do NOT pass any value that might have been modified since it was generated
423
    by serialiase_metadata().
424
    """
425
    return eval(serialised)
426
427
428
def serialiase_metadata(metadata):
429
    return repr(_to_native(metadata))
430
431
432
def _object_id_term(object_id):
433
    return _PREFIX_FULL_VALUE + _PREFIX_OBJECT_ID + '%s-%s' % object_id
434
435
436
def _prefix_for_unknown(name):
437
    return 'Xu%d:%s' % (len(name), unicode(name).encode('utf-8'))
438
439
440
def _serialise_value(info, value):
441
    if info['type'] in (float, int, long):
442
        return xapian.sortable_serialise(info['type'](value))
443
    elif info['type'] == unicode:
444
        return unicode(value).encode('utf-8')
445
446
    return str(info['type'](value))
447
448
449
def _sort_documents(documents, sort_value_nr, sort_reverse):
450
    def _cmp(document_a, document_b):
451
        value_a = document_a.get_value(sort_value_nr)
452
        value_b = document_b.get_value(sort_value_nr)
453
        if value_a < value_b:
454
            return -1
455
        elif value_a > value_b:
456
            return 1
457
458
        docid_a = document_a.get_docid()
459
        docid_b = document_b.get_docid()
460
        if docid_a < docid_b:
461
            return -1
462
        elif docid_a > docid_b:
463
            return 1
464
        return 0
465
466
    documents.sort(cmp=_cmp, reverse=sort_reverse)
467
468
469
def _to_native(value):
470
    if isinstance(value, list):
471
        return [_to_native(e) for e in value]
472
    elif isinstance(value, dict):
473
        return dict([(_to_native(k), _to_native(v)) for k, v in value.items()])
474
    elif isinstance(value, unicode):
475
        return unicode(value)
476
    elif isinstance(value, str):
477
        return str(value)
478
    elif isinstance(value, int):
479
        return int(value)
480
    elif isinstance(value, float):
481
        return float(value)
482
    return value