Web · Wiki · Activities · Blog · Lists · Chat · Meeting · Bugs · Git · Translate · Archive · People · Donate
1
#
2
# Author: Sascha Silbe <sascha-pgp@silbe.org>
3
#
4
# This program is free software; you can redistribute it and/or modify
5
# it under the terms of the GNU General Public License version 3
6
# as published by the Free Software Foundation.
7
#
8
# This program is distributed in the hope that it will be useful,
9
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11
# GNU General Public License for more details.
12
#
13
# You should have received a copy of the GNU General Public License
14
# along with this program. If not, see <http://www.gnu.org/licenses/>.
15
"""
16
Gdatastore D-Bus service API
17
"""
18
19
import ast
20
import hashlib
21
import logging
22
import os
23
import pprint
24
import re
25
import shutil
26
from subprocess import Popen, PIPE
27
import tempfile
28
import time
29
import uuid
30
31
import dbus
32
import dbus.service
33
import gconf
34
35
try:
36
    from sugar import mime as sugar_mime
37
except ImportError:
38
    # Only used for helping legacy applications that use the file
39
    # extension rather than the MIME type
40
    sugar_mime = None
41
42
from gdatastore.index import Index
43
44
45
DBUS_SERVICE_NATIVE_V1 = 'org.silbe.GDataStore'
46
DBUS_INTERFACE_NATIVE_V1 = 'org.silbe.GDataStore1'
47
DBUS_PATH_NATIVE_V1 = '/org/silbe/GDataStore1'
48
49
DBUS_SERVICE_SUGAR_V2 = 'org.laptop.sugar.DataStore'
50
DBUS_INTERFACE_SUGAR_V2 = 'org.laptop.sugar.DataStore'
51
DBUS_PATH_SUGAR_V2 = '/org/laptop/sugar/DataStore'
52
53
DBUS_SERVICE_SUGAR_V3 = 'org.laptop.sugar.DataStore'
54
DBUS_INTERFACE_SUGAR_V3 = 'org.laptop.sugar.DataStore2'
55
DBUS_PATH_SUGAR_V3 = '/org/laptop/sugar/DataStore2'
56
57
_DBUS_METADATA_BASIC_RE = re.compile(
58
    r"""dbus.(U?Int(16|32|64)|Double|String|ByteArray)\((?P<value>(-?[0-9]+(\.[0-9]*)?)|(u?('([^'\\]|\\.)*'|"([^"\\]|\\.)*")))(, variant_level=[0-9]+)?\)""")
59
_DBUS_METADATA_DICTIONARY_RE = re.compile(
60
    r"""dbus.Dictionary\((?P<value>\{.*\}), signature=dbus.Signature\('s[sv]'\)\)""")
61
62
63
class DataStoreError(Exception):
64
    pass
65
66
67
class GitError(DataStoreError):
68
    def __init__(self, returncode, stderr):
69
        self.returncode = returncode
70
        self.stderr = unicode(stderr)
71
        Exception.__init__(self)
72
73
    def __unicode__(self):
74
        return u'Git returned with exit code #%d: %s' % (self.returncode,
75
                                                         self.stderr)
76
77
    def __str__(self):
78
        return self.__unicode__()
79
80
81
class DBusApiNativeV1(dbus.service.Object):
82
    """Native gdatastore D-Bus API
83
    """
84
85
    def __init__(self, internal_api):
86
        self._internal_api = internal_api
87
        bus_name = dbus.service.BusName(DBUS_SERVICE_NATIVE_V1,
88
                                        bus=dbus.SessionBus(),
89
                                        replace_existing=False,
90
                                        allow_replacement=False,
91
                                        do_not_queue=True)
92
        dbus.service.Object.__init__(self, bus_name, DBUS_PATH_NATIVE_V1)
93
        self._internal_api.add_callback('change_metadata',
94
                                        self.__change_metadata_cb)
95
        self._internal_api.add_callback('delete', self.__delete_cb)
96
        self._internal_api.add_callback('save', self.__save_cb)
97
98
    @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='sssa{sv}')
99
    def AddedNewVersion(self, tree_id, child_id, parent_id, metadata):
100
        # pylint: disable-msg=C0103
101
        pass
102
103
    @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='ssa{sv}')
104
    def Created(self, tree_id, child_id, metadata):
105
        # pylint: disable-msg=C0103
106
        pass
107
108
    @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='ssa{sv}')
109
    def ChangedMetadata(self, tree_id, version_id, metadata):
110
        # pylint: disable-msg=C0103
111
        pass
112
113
    @dbus.service.signal(DBUS_INTERFACE_NATIVE_V1, signature='ss')
114
    def Deleted(self, tree_id, version_id):
115
        # pylint: disable-msg=C0103
116
        pass
117
118
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
119
                         in_signature='a{sv}s', out_signature='ss',
120
                         async_callbacks=('async_cb', 'async_err_cb'),
121
                         byte_arrays=True)
122
    def create(self, metadata, data_path, async_cb, async_err_cb):
123
        """
124
        - add new entry, assign ids
125
        - data='' indicates no data to store
126
        - bad design? (data OOB)
127
        """
128
        # TODO: what about transfer_ownership/delete_after?
129
        self._internal_api.save(tree_id='', parent_id='', metadata=metadata,
130
                                path=data_path, delete_after=True,
131
                                async_cb=async_cb,
132
                                async_err_cb=async_err_cb)
133
134
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
135
                         in_signature='a{sv}h', out_signature='ss',
136
                         async_callbacks=('async_cb', 'async_err_cb'),
137
                         byte_arrays=True)
138
    def create_fd(self, metadata, data_fd, async_cb, async_err_cb):
139
        """
140
        - add new entry, assign ids
141
        """
142
        # FIXME: avoid unnecessary copy, instead change
143
        # InternalAPI.save() to take fd and adapt existing callers.
144
        tmp_file = tempfile.NamedTemporaryFile(delete=False)
145
        with os.fdopen(data_fd.take(), 'rb') as data_file:
146
            shutil.copyfileobj(data_file, tmp_file)
147
148
        tmp_file.flush()
149
        self._internal_api.save(tree_id='', parent_id='', metadata=metadata,
150
                                path=tmp_file.name, delete_after=True,
151
                                async_cb=async_cb,
152
                                async_err_cb=async_err_cb)
153
154
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
155
                         in_signature='ssa{sv}s', out_signature='s',
156
                         async_callbacks=('async_cb', 'async_err_cb'),
157
                         byte_arrays=True)
158
    def add_version(self, tree_id, parent_id, metadata, data_path, async_cb,
159
                    async_err_cb):
160
        """
161
        - add new version to existing object
162
        """
163
        def success_cb(tree_id, child_id):
164
            async_cb(child_id)
165
166
        if not tree_id:
167
            raise ValueError('No tree_id given')
168
169
        if not parent_id:
170
            raise ValueError('No parent_id given')
171
172
        self._internal_api.save(tree_id, parent_id, metadata, data_path,
173
                                delete_after=True,
174
                                async_cb=success_cb,
175
                                async_err_cb=async_err_cb)
176
177
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
178
                         in_signature='ssa{sv}h', out_signature='s',
179
                         async_callbacks=('async_cb', 'async_err_cb'),
180
                         byte_arrays=True)
181
    def add_version_fd(self, tree_id, parent_id, metadata, data_fd, async_cb,
182
                    async_err_cb):
183
        """
184
        - add new version to existing object
185
        """
186
        def success_cb(tree_id, child_id):
187
            async_cb(child_id)
188
189
        if not tree_id:
190
            raise ValueError('No tree_id given')
191
192
        if not parent_id:
193
            raise ValueError('No parent_id given')
194
195
        # FIXME: avoid unnecessary copy, instead change
196
        # InternalAPI.save() to take fd and adapt existing callers.
197
        tmp_file = tempfile.NamedTemporaryFile(delete=False)
198
        with os.fdopen(data_fd.take(), 'rb') as data_file:
199
            shutil.copyfileobj(data_file, tmp_file)
200
201
        tmp_file.flush()
202
        self._internal_api.save(tree_id, parent_id, metadata,
203
                                path=tmp_file.name, delete_after=True,
204
                                async_cb=success_cb,
205
                                async_err_cb=async_err_cb)
206
207
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
208
                         in_signature='ssa{sv}', out_signature='',
209
                         byte_arrays=True)
210
    def change_metadata(self, tree_id, version_id, metadata):
211
        """
212
        - change the metadata of an existing version
213
        """
214
        object_id = (tree_id, version_id)
215
        self._internal_api.change_metadata(object_id, metadata)
216
217
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
218
                         in_signature='ss', out_signature='')
219
    def delete(self, tree_id, version_id):
220
        object_id = (tree_id, version_id)
221
        self._internal_api.delete(object_id)
222
223
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
224
                         in_signature='a{sv}a{sv}', out_signature='aa{sv}u',
225
                         byte_arrays=True)
226
    def find(self, query_dict, options):
227
        return self._internal_api.find(query_dict, options)
228
229
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
230
                         in_signature='ss', out_signature='s',
231
                         sender_keyword='sender')
232
    def get_data_path(self, tree_id, version_id, sender=None):
233
        object_id = (tree_id, version_id)
234
        return self._internal_api.get_data_path(object_id, sender=sender)
235
236
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
237
                         in_signature='ss', out_signature='h',
238
                         sender_keyword='sender')
239
    def get_data_fd(self, tree_id, version_id, sender=None):
240
        object_id = (tree_id, version_id)
241
        path = self._internal_api.get_data_path(object_id, sender=sender)
242
        return os.open(path, os.O_RDONLY)
243
244
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
245
                         in_signature='ss', out_signature='a{sv}')
246
    def get_metadata(self, tree_id, version_id):
247
        object_id = (tree_id, version_id)
248
        return self._internal_api.get_properties(object_id)
249
250
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
251
                         in_signature='a{sv}sa{sv}', out_signature='aa{sv}u',
252
                         byte_arrays=True)
253
    def text_search(self, query_dict, query_string, options):
254
        return self._internal_api.find(query_dict, options, query_string)
255
256
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
257
                         in_signature='sssa{sv}s', out_signature='ss',
258
                         async_callbacks=('async_cb', 'async_err_cb'),
259
                         byte_arrays=True)
260
    def restore(self, tree_id, parent_id, version_id, metadata, data_path,
261
                async_cb, async_err_cb):
262
        """
263
        - add a new version with the given ids
264
        - there must be no existing entry with the same (tree_id, version_id)
265
        """
266
        if not tree_id:
267
            raise ValueError('No tree_id given')
268
269
        metadata['version_id'] = version_id
270
        self._internal_api.save(tree_id, parent_id, metadata, data_path,
271
                                delete_after=True, allow_new_parent=True,
272
                                async_cb=async_cb,
273
                                async_err_cb=async_err_cb)
274
275
    @dbus.service.method(DBUS_INTERFACE_NATIVE_V1,
276
                         in_signature='sssa{sv}s', out_signature='ss',
277
                         async_callbacks=('async_cb', 'async_err_cb'),
278
                         byte_arrays=True)
279
    def restore_fd(self, tree_id, parent_id, version_id, metadata, data_fd,
280
                async_cb, async_err_cb):
281
        """
282
        - add a new version with the given ids
283
        - there must be no existing entry with the same (tree_id, version_id)
284
        """
285
        if not tree_id:
286
            raise ValueError('No tree_id given')
287
288
        metadata['version_id'] = version_id
289
        # FIXME: avoid unnecessary copy, instead change
290
        # InternalAPI.save() to take fd and adapt existing callers.
291
        tmp_file = tempfile.NamedTemporaryFile(delete=False)
292
        with os.fdopen(data_fd.take(), 'rb') as data_file:
293
            shutil.copyfileobj(data_file, tmp_file)
294
295
        tmp_file.flush()
296
        self._internal_api.save(tree_id, parent_id, metadata, tmp_file.name,
297
                                delete_after=True, allow_new_parent=True,
298
                                async_cb=async_cb,
299
                                async_err_cb=async_err_cb)
300
301
    def __change_metadata_cb(self, (tree_id, version_id), metadata):
302
        self.ChangedMetadata(tree_id, version_id, metadata)
303
304
    def __delete_cb(self, (tree_id, version_id)):
305
        self.Deleted(tree_id, version_id)
306
307
    def __save_cb(self, tree_id, child_id, parent_id, metadata):
308
        if parent_id:
309
            self.AddedNewVersion(tree_id, child_id, parent_id, metadata)
310
        else:
311
            self.Created(tree_id, child_id, metadata)
312
313
314
class DBusApiSugarV2(dbus.service.Object):
315
    """Compatibility layer for the Sugar 0.84+ data store D-Bus API
316
    """
317
318
    def __init__(self, internal_api):
319
        self._internal_api = internal_api
320
        bus_name = dbus.service.BusName(DBUS_SERVICE_SUGAR_V2,
321
                                        bus=dbus.SessionBus(),
322
                                        replace_existing=False,
323
                                        allow_replacement=False,
324
                                        do_not_queue=True)
325
        dbus.service.Object.__init__(self, bus_name, DBUS_PATH_SUGAR_V2)
326
        self._internal_api.add_callback('change_metadata',
327
                                        self.__change_metadata_cb)
328
        self._internal_api.add_callback('delete', self.__delete_cb)
329
        self._internal_api.add_callback('save', self.__save_cb)
330
331
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
332
                         in_signature='a{sv}sb', out_signature='s',
333
                         async_callbacks=('async_cb', 'async_err_cb'),
334
                         byte_arrays=True)
335
    def create(self, props, file_path, transfer_ownership,
336
               async_cb, async_err_cb):
337
        def success_cb(tree_id, child_id):
338
            async_cb(tree_id)
339
340
        self._internal_api.save(tree_id='', parent_id='', metadata=props,
341
                                path=file_path,
342
                                delete_after=transfer_ownership,
343
                                async_cb=success_cb,
344
                                async_err_cb=async_err_cb)
345
346
    @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='s')
347
    def Created(self, uid):
348
        # pylint: disable-msg=C0103
349
        pass
350
351
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
352
                         in_signature='sa{sv}sb', out_signature='',
353
                         async_callbacks=('async_cb', 'async_err_cb'),
354
                         byte_arrays=True)
355
    def update(self, uid, props, file_path, transfer_ownership,
356
               async_cb, async_err_cb):
357
        def success_cb(tree_id, child_id):
358
            async_cb()
359
360
        latest_versions = self._get_latest(uid)
361
        if not latest_versions:
362
            raise ValueError('Trying to update non-existant entry %s - wanted'
363
                ' to use create()?' % (uid, ))
364
365
        parent = latest_versions[0]
366
        object_id = parent['tree_id'], parent['version_id']
367
        if self._check_identical(parent, file_path):
368
            self._internal_api.change_metadata(object_id, props)
369
            return success_cb(uid, None)
370
371
        self._internal_api.save(tree_id=uid,
372
            parent_id=parent['version_id'], metadata=props,
373
            path=file_path, delete_after=transfer_ownership,
374
            async_cb=success_cb, async_err_cb=async_err_cb)
375
376
    @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='s')
377
    def Updated(self, uid):
378
        # pylint: disable-msg=C0103
379
        pass
380
381
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
382
                         in_signature='a{sv}as', out_signature='aa{sv}u')
383
    def find(self, query, properties):
384
        if 'uid' in properties:
385
            properties.append('tree_id')
386
            properties.remove('uid')
387
388
        options = {'metadata': properties}
389
        for name in ['offset', 'limit', 'order_by']:
390
            if name in query:
391
                options[name] = query.pop(name)
392
393
        if 'uid' in query:
394
            query['tree_id'] = query.pop('uid')
395
396
        results, count = self._internal_api.find(query, options,
397
            query.pop('query', None))
398
399
        if not properties or 'tree_id' in properties:
400
            for entry in results:
401
                entry['uid'] = entry.pop('tree_id')
402
403
        return results, count
404
405
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
406
                         in_signature='s', out_signature='s',
407
                         sender_keyword='sender')
408
    def get_filename(self, uid, sender=None):
409
        latest_versions = self._get_latest(uid)
410
        if not latest_versions:
411
            raise ValueError('Entry %s does not exist' % (uid, ))
412
413
        object_id = (uid, latest_versions[0]['version_id'])
414
        return self._internal_api.get_data_path(object_id, sender=sender)
415
416
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
417
                         in_signature='s', out_signature='a{sv}')
418
    def get_properties(self, uid):
419
        latest_versions = self._get_latest(uid)
420
        if not latest_versions:
421
            raise ValueError('Entry %s does not exist' % (uid, ))
422
423
        latest_versions[0]['uid'] = latest_versions[0].pop('tree_id')
424
        del latest_versions[0]['version_id']
425
        return latest_versions[0]
426
427
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
428
                         in_signature='sa{sv}', out_signature='as')
429
    def get_uniquevaluesfor(self, propertyname, query=None):
430
        return self._internal_api.find_unique_values(query, propertyname)
431
432
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
433
                         in_signature='s', out_signature='')
434
    def delete(self, uid):
435
        latest_versions = self._get_latest(uid)
436
        if not latest_versions:
437
            raise ValueError('Entry %s does not exist' % (uid, ))
438
439
        self._internal_api.delete((uid, latest_versions[0]['version_id']))
440
441
    @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='s')
442
    def Deleted(self, uid):
443
        # pylint: disable-msg=C0103
444
        pass
445
446
    @dbus.service.method(DBUS_INTERFACE_SUGAR_V2,
447
                         in_signature='', out_signature='aa{sv}')
448
    def mounts(self):
449
        return [{'id': 1}]
450
451
    @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='a{sv}')
452
    def Mounted(self, descriptior):
453
        # pylint: disable-msg=C0103
454
        pass
455
456
    @dbus.service.signal(DBUS_INTERFACE_SUGAR_V2, signature='a{sv}')
457
    def Unmounted(self, descriptor):
458
        # pylint: disable-msg=C0103
459
        pass
460
461
    def _get_latest(self, uid):
462
        return self._internal_api.find({'tree_id': uid},
463
            {'limit': 1, 'order_by': ['+timestamp']})[0]
464
465
    def _check_identical(self, parent, child_data_path):
466
        """Check whether the new version contains the same data as the parent
467
468
        If child_data_path is empty, but the parent contains data, that's
469
        interpreted as wanting to do a metadata-only update (emulating
470
        sugar-datastore behaviour).
471
        """
472
        parent_object_id = (parent['tree_id'], parent['version_id'])
473
        parent_data_path = self._internal_api.get_data_path(parent_object_id)
474
        if not child_data_path:
475
            return True
476
        elif child_data_path and not parent_data_path:
477
            return False
478
479
        # TODO: compare checksums?
480
        return False
481
482
    def __change_metadata_cb(self, (tree_id, version_id), metadata):
483
        self.Updated(tree_id)
484
485
    def __delete_cb(self, (tree_id, version_id)):
486
        if self._get_latest(tree_id):
487
            self.Updated(tree_id)
488
        else:
489
            self.Deleted(tree_id)
490
491
    def __save_cb(self, tree_id, child_id, parent_id, metadata):
492
        if parent_id:
493
            self.Updated(tree_id)
494
        else:
495
            self.Created(tree_id)
496
497
498
class InternalApi(object):
499
500
    SIGNALS = ['change_metadata', 'delete', 'save']
501
502
    def __init__(self, base_dir):
503
        self._base_dir = base_dir
504
        self._callbacks = {}
505
        self._checkouts_dir = os.path.join(base_dir, 'checkouts')
506
        if not os.path.exists(self._checkouts_dir):
507
            os.makedirs(self._checkouts_dir)
508
        self._git_dir = os.path.join(base_dir, 'git')
509
        self._git_env = {}
510
        gconf_client = gconf.client_get_default()
511
        self._max_versions = gconf_client.get_int(
512
            '/desktop/sugar/datastore/max_versions')
513
        logging.debug('max_versions=%r', self._max_versions)
514
        self._index = Index(os.path.join(self._base_dir, 'index'))
515
        self._migrate()
516
        self._check_reindex()
517
        logging.info('ready')
518
519
    def add_callback(self, signal, callback):
520
        if signal not in InternalApi.SIGNALS:
521
            raise ValueError('Invalid signal %r' % (signal, ))
522
523
        self._callbacks.setdefault(signal, []).append(callback)
524
525
    def change_metadata(self, object_id, metadata):
526
        logging.debug('change_metadata(%r, %r)', object_id, metadata)
527
        metadata['tree_id'], metadata['version_id'] = object_id
528
        if 'creation_time' not in metadata:
529
            old_metadata = self._index.retrieve(object_id)
530
            metadata['creation_time'] = old_metadata['creation_time']
531
532
        self._index.store(object_id, metadata)
533
        self._invoke_callbacks('change_metadata', object_id, metadata)
534
535
    def delete(self, object_id):
536
        logging.debug('delete(%r)', object_id)
537
        self._index.delete(object_id)
538
        self._git_call('update-ref', ['-d', _format_ref(*object_id)])
539
        self._invoke_callbacks('delete', object_id)
540
541
    def get_data_path(self, (tree_id, version_id), sender=None):
542
        logging.debug('get_data_path((%r, %r), %r)', tree_id, version_id,
543
                      sender)
544
        metadata = self._index.retrieve((tree_id, version_id))
545
        ref_name = _format_ref(tree_id, version_id)
546
        top_level_entries = self._git_call('ls-tree',
547
                                           [ref_name]).splitlines()
548
        if len(top_level_entries) == 1 and \
549
           top_level_entries[0].endswith('\tdata'):
550
            blob_hash = top_level_entries[0].split('\t')[0].split(' ')[2]
551
            mime_type = metadata.get('mime_type', '')
552
            return self._checkout_file(blob_hash,
553
                                       suffix=_guess_extension(mime_type))
554
555
        return self._checkout_dir(ref_name)
556
557
    def find(self, query_dict, options, query_string=None):
558
        logging.debug('find(%r, %r, %r)', query_dict, options, query_string)
559
        entries, total_count = self._index.find(query_dict, query_string,
560
                                                options)
561
        #logging.debug('object_ids=%r', object_ids)
562
        property_names = options.pop('metadata', None)
563
        for entry in entries:
564
            for name in entry.keys():
565
                if property_names and name not in property_names:
566
                    del entry[name]
567
                elif isinstance(entry[name], str):
568
                    entry[name] = dbus.ByteArray(entry[name])
569
570
        return entries, total_count
571
572
    def find_unique_values(self, query, name):
573
        logging.debug('find_unique_values(%r, %r)', query, name)
574
        if query:
575
            raise NotImplementedError('non-empty query not supported yet')
576
577
        return self._index.find_unique_values(name)
578
579
    def get_properties(self, object_id):
580
        return self._index.retrieve(object_id)
581
582
    def save(self, tree_id, parent_id, metadata, path, delete_after, async_cb,
583
             async_err_cb, allow_new_parent=False):
584
        logging.debug('save(%r, %r, %r, %r, %r)', tree_id, parent_id,
585
                      metadata, path, delete_after)
586
587
        if path:
588
            path = os.path.realpath(path)
589
            if not os.access(path, os.R_OK):
590
                raise ValueError('Invalid path given.')
591
592
            if delete_after and not os.access(os.path.dirname(path), os.W_OK):
593
                raise ValueError('Deletion requested for read-only directory')
594
595
        if (not tree_id) and parent_id:
596
            raise ValueError('tree_id is empty but parent_id is not')
597
598
        if tree_id and not parent_id and not allow_new_parent:
599
            if self.find({'tree_id': tree_id}, {'limit': 1})[1]:
600
                raise ValueError('No parent_id given but tree_id already '
601
                                 'exists')
602
603
        if not tree_id:
604
            tree_id = self._gen_uuid()
605
606
        child_id = metadata.get('version_id')
607
        if not child_id:
608
            child_id = self._gen_uuid()
609
        elif not tree_id:
610
            raise ValueError('No tree_id given but metadata contains'
611
                             ' version_id')
612
        elif self._index.contains((tree_id, child_id)):
613
            raise ValueError('There is an existing entry with the same tree_id'
614
                             ' and version_id')
615
616
        if 'timestamp' not in metadata:
617
            metadata['timestamp'] = time.time()
618
619
        if 'creation_time' not in metadata:
620
            metadata['creation_time'] = metadata['timestamp']
621
622
        if os.path.isfile(path):
623
            metadata['filesize'] = str(os.stat(path).st_size)
624
        elif not path:
625
            metadata['filesize'] = '0'
626
627
        tree_id = str(tree_id)
628
        parent_id = str(parent_id)
629
        child_id = str(child_id)
630
631
        metadata['tree_id'] = tree_id
632
        metadata['version_id'] = child_id
633
634
        # TODO: check metadata for validity first (index?)
635
        self._log_store((tree_id, child_id))
636
        self._store_entry_in_git(tree_id, child_id, parent_id, path, metadata)
637
        self._index.store((tree_id, child_id), metadata)
638
        self._invoke_callbacks('save', tree_id, child_id, parent_id, metadata)
639
640
        if delete_after and path:
641
            os.remove(path)
642
643
        async_cb(tree_id, child_id)
644
645
    def stop(self):
646
        logging.debug('stop()')
647
        self._index.close()
648
649
    def _add_to_git_index(self, index_path, path):
650
        if os.path.isdir(path):
651
            self._git_call('add', ['-A'], work_dir=path, index_path=index_path)
652
        elif os.path.isfile(path):
653
            object_hash = self._git_call('hash-object', ['-w', path]).strip()
654
            mode = os.stat(path).st_mode
655
            self._git_call('update-index',
656
                           ['--add',
657
                            '--cacheinfo', oct(mode), object_hash, 'data'],
658
                           index_path=index_path)
659
        else:
660
            raise DataStoreError('Refusing to store special object %r' % (path, ))
661
662
    def _check_max_versions(self, tree_id):
663
        if not self._max_versions:
664
            return
665
666
        options = {'all_versions': True, 'offset': self._max_versions,
667
                   'metadata': ['tree_id', 'version_id', 'timestamp'],
668
                   'order_by': ['+timestamp']}
669
        old_versions = self.find({'tree_id': tree_id}, options)[0]
670
        logging.info('Deleting old versions: %r', old_versions)
671
        for entry in old_versions:
672
            self.delete((entry['tree_id'], entry['version_id']))
673
674
    def _checkout_file(self, blob_hash, suffix=''):
675
        fd, file_name = tempfile.mkstemp(dir=self._checkouts_dir, suffix=suffix)
676
        try:
677
            self._git_call('cat-file', ['blob', blob_hash], stdout_fd=fd)
678
        finally:
679
            os.close(fd)
680
        return file_name
681
682
    def _checkout_dir(self, ref_name):
683
        # FIXME
684
        return ''
685
686
    def _create_repo(self):
687
        os.makedirs(self._git_dir)
688
        self._git_call('init', ['-q', '--bare'])
689
690
    def _migrate(self):
691
        if not os.path.exists(self._git_dir):
692
            return self._create_repo()
693
694
    def _check_reindex(self):
695
        """Recreate or update index if necessary
696
        """
697
        last_object_id = self._get_last_object_id_from_log()
698
        # Non-existence of the log (i.e. last_object_id=None) does not
699
        # necessarily mean an empty data store: We could be upgrading
700
        # from a previous version that didn't write the file, a file
701
        # system corruption may have occured or the user may have
702
        # deleted the log to force reindexing. This operation is cheap
703
        # enough on empty data stores that we don't care about the
704
        # performance impact on valid, empty data stores.
705
        if not last_object_id or not self._index.contains(last_object_id):
706
            logging.info('Rebuilding index')
707
            self._reindex()
708
709
    def _reindex(self):
710
        """Recreate or update index from git repository
711
712
        Log the last object after finishing the rebuild.
713
        """
714
        last_object_id = None
715
        for object_id in self._get_object_ids_from_git():
716
            last_object_id = object_id
717
            logging.debug('reindex(): checking entry %r', object_id)
718
            if self._index.contains(object_id):
719
                continue
720
721
            logging.debug('reindex(): adding entry %r from git', object_id)
722
            metadata = self._get_metadata_from_git(object_id)
723
            self._index.store(object_id, metadata)
724
725
        if last_object_id:
726
            self._log_store(last_object_id)
727
728
    def _format_commit_message(self, metadata):
729
        return pprint.pformat(to_native(metadata))
730
731
    def _parse_commit_message(self, commit_message):
732
        try:
733
            return ast.literal_eval(commit_message)
734
        except ValueError:
735
            return self._parse_commit_message_dbus(commit_message)
736
737
    def _parse_commit_message_dbus(self, commit_message):
738
        # Compatibility work-around to parse commit messages
739
        # written by previous versions and containing dbus.Int()
740
        # instead of plain integer literals.
741
        num_subs = 1
742
        while num_subs:
743
            commit_message, num_subs = re.subn(_DBUS_METADATA_DICTIONARY_RE,
744
                                              '\g<value>', commit_message)
745
        num_subs = 1
746
        while num_subs:
747
            commit_message, num_subs = re.subn(_DBUS_METADATA_BASIC_RE,
748
                                              '\g<value>', commit_message)
749
750
        return ast.literal_eval(commit_message)
751
752
    def _gen_uuid(self):
753
        return str(uuid.uuid4())
754
755
    def _git_call(self, command, args=None, input=None, input_fd=None,
756
                  stdout_fd=None, work_dir=None, index_path=None):
757
        env = dict(self._git_env)
758
        if work_dir:
759
            env['GIT_WORK_TREE'] = work_dir
760
        if index_path:
761
            env['GIT_INDEX_FILE'] = index_path
762
        logging.debug('calling git %s, env=%r', ['git', command] + (args or []), env)
763
        pipe = Popen(['git', command] + (args or []), stdin=input_fd or PIPE,
764
                     stdout=stdout_fd or PIPE, stderr=PIPE, close_fds=True,
765
                     cwd=self._git_dir, env=env)
766
        stdout, stderr = pipe.communicate(input)
767
        if pipe.returncode:
768
            raise GitError(pipe.returncode, stderr)
769
        return stdout
770
771
    def _invoke_callbacks(self, signal, *args):
772
        for callback in self._callbacks.get(signal, []):
773
            callback(*args)
774
775
    def _store_entry_in_git(self, tree_id, version_id, parent_id, path, metadata):
776
        commit_message = self._format_commit_message(metadata)
777
        tree_hash = self._write_tree(path)
778
        commit_hash = self._git_call('commit-tree', [tree_hash],
779
                                     input=commit_message).strip()
780
        self._git_call('update-ref', [_format_ref(tree_id, version_id),
781
                                      commit_hash])
782
783
    def _write_tree(self, path):
784
        if not path:
785
            return self._git_call('hash-object',
786
                                  ['-w', '-t', 'tree', '--stdin'],
787
                                  input='').strip()
788
789
        index_dir = tempfile.mkdtemp(prefix='gdatastore-')
790
        index_path = os.path.join(index_dir, 'index')
791
        try:
792
            self._add_to_git_index(index_path, path)
793
            return self._git_call('write-tree', index_path=index_path).strip()
794
        finally:
795
            shutil.rmtree(index_dir)
796
797
    def _get_object_ids_from_git(self):
798
        args = ['--sort=committerdate', '--format=%(refname)',
799
                'refs/gdatastore/*/*']
800
        return [tuple(line.rsplit('/', 2)[1:])
801
                for line in self._git_call('for-each-ref', args).split()]
802
803
    def _get_metadata_from_git(self, object_id):
804
        args = ['commit', _format_ref(*object_id)]
805
        commit_message = self._git_call('cat-file', args).split('\n\n', 1)[1]
806
        return self._parse_commit_message(commit_message)
807
808
    def _log_store(self, object_id):
809
        """Record the fact that we tried to store the given object
810
811
        Make sure we know on next start-up that the object with the
812
        given object_id was the last one to be processed. Used for
813
        checking the index on start-up and triggering a rebuild if
814
        necessary.
815
        """
816
        log_name = os.path.join(self._base_dir, 'last_object_id')
817
        tmp_name = log_name + '.tmp'
818
        with open(tmp_name, 'w') as f:
819
            f.write(repr(tuple(object_id)))
820
            f.flush()
821
            os.fsync(f.fileno())
822
823
        os.rename(tmp_name, log_name)
824
825
    def _get_last_object_id_from_log(self):
826
        """Return the object_id saved by _log_store()
827
828
        Return the object_id of the last object to be processed, as
829
        written by _log_store(). If no such log exists, return None.
830
        """
831
        log_name = os.path.join(self._base_dir, 'last_object_id')
832
        if not os.path.exists(log_name):
833
            return None
834
835
        return ast.literal_eval(open(log_name).read())
836
837
838
def calculate_checksum(path):
839
    checksum = hashlib.sha1()
840
    f = file(path)
841
    while True:
842
        chunk = f.read(65536)
843
        if not chunk:
844
            return checksum.hexdigest()
845
846
        checksum.update(chunk)
847
848
849
def to_native(value):
850
    if isinstance(value, list):
851
        return [to_native(e) for e in value]
852
    elif isinstance(value, dict):
853
        return dict([(to_native(k), to_native(v)) for k, v in value.items()])
854
    elif isinstance(value, unicode):
855
        return unicode(value)
856
    elif isinstance(value, str):
857
        return str(value)
858
    elif isinstance(value, int):
859
        return int(value)
860
    elif isinstance(value, float):
861
        return float(value)
862
    else:
863
        raise TypeError('Unknown type: %s' % (type(value), ))
864
865
866
def _format_ref(tree_id, version_id):
867
    return 'refs/gdatastore/%s/%s' % (tree_id, version_id)
868
869
870
def _guess_extension(mime_type):
871
    if sugar_mime is None:
872
        return ''
873
    extension = sugar_mime.get_primary_extension(mime_type)
874
    if not extension:
875
        return ''
876
    return '.' + extension