comparison env/lib/python3.9/site-packages/rdflib/plugins/sleepycat.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 import logging
2 from threading import Thread
3 from os.path import exists, abspath
4 from os import mkdir
5 from rdflib.store import Store, VALID_STORE, NO_STORE
6 from rdflib.term import URIRef
7 from six import b
8 from six.moves.urllib.request import pathname2url
9
10
11 def bb(u):
12 return u.encode('utf-8')
13
14
15 try:
16 from bsddb import db
17 has_bsddb = True
18 except ImportError:
19 try:
20 from bsddb3 import db
21 has_bsddb = True
22 except ImportError:
23 has_bsddb = False
24
25
26 if has_bsddb:
27 # These are passed to bsddb when creating DBs
28
29 # passed to db.DBEnv.set_flags
30 ENVSETFLAGS = db.DB_CDB_ALLDB
31 # passed to db.DBEnv.open
32 ENVFLAGS = db.DB_INIT_MPOOL | db.DB_INIT_CDB | db.DB_THREAD
33 CACHESIZE = 1024 * 1024 * 50
34
35 # passed to db.DB.Open()
36 DBOPENFLAGS = db.DB_THREAD
37
38 logger = logging.getLogger(__name__)
39
40 __all__ = ['Sleepycat']
41
42
43 class Sleepycat(Store):
44 context_aware = True
45 formula_aware = True
46 transaction_aware = False
47 graph_aware = True
48 db_env = None
49
50 def __init__(self, configuration=None, identifier=None):
51 if not has_bsddb:
52 raise ImportError(
53 "Unable to import bsddb/bsddb3, store is unusable.")
54 self.__open = False
55 self.__identifier = identifier
56 super(Sleepycat, self).__init__(configuration)
57 self._loads = self.node_pickler.loads
58 self._dumps = self.node_pickler.dumps
59
60 def __get_identifier(self):
61 return self.__identifier
62 identifier = property(__get_identifier)
63
64 def _init_db_environment(self, homeDir, create=True):
65 if not exists(homeDir):
66 if create is True:
67 mkdir(homeDir)
68 # TODO: implement create method and refactor this to it
69 self.create(homeDir)
70 else:
71 return NO_STORE
72 db_env = db.DBEnv()
73 db_env.set_cachesize(0, CACHESIZE) # TODO
74 # db_env.set_lg_max(1024*1024)
75 db_env.set_flags(ENVSETFLAGS, 1)
76 db_env.open(homeDir, ENVFLAGS | db.DB_CREATE)
77 return db_env
78
79 def is_open(self):
80 return self.__open
81
82 def open(self, path, create=True):
83 if not has_bsddb:
84 return NO_STORE
85 homeDir = path
86
87 if self.__identifier is None:
88 self.__identifier = URIRef(pathname2url(abspath(homeDir)))
89
90 db_env = self._init_db_environment(homeDir, create)
91 if db_env == NO_STORE:
92 return NO_STORE
93 self.db_env = db_env
94 self.__open = True
95
96 dbname = None
97 dbtype = db.DB_BTREE
98 # auto-commit ensures that the open-call commits when transactions
99 # are enabled
100
101 dbopenflags = DBOPENFLAGS
102 if self.transaction_aware is True:
103 dbopenflags |= db.DB_AUTO_COMMIT
104
105 if create:
106 dbopenflags |= db.DB_CREATE
107
108 dbmode = 0o660
109 dbsetflags = 0
110
111 # create and open the DBs
112 self.__indicies = [None, ] * 3
113 self.__indicies_info = [None, ] * 3
114 for i in range(0, 3):
115 index_name = to_key_func(
116 i)((b("s"), b("p"), b("o")), b("c")).decode()
117 index = db.DB(db_env)
118 index.set_flags(dbsetflags)
119 index.open(index_name, dbname, dbtype, dbopenflags, dbmode)
120 self.__indicies[i] = index
121 self.__indicies_info[i] = (index, to_key_func(i), from_key_func(i))
122
123 lookup = {}
124 for i in range(0, 8):
125 results = []
126 for start in range(0, 3):
127 score = 1
128 len = 0
129 for j in range(start, start + 3):
130 if i & (1 << (j % 3)):
131 score = score << 1
132 len += 1
133 else:
134 break
135 tie_break = 2 - start
136 results.append(((score, tie_break), start, len))
137
138 results.sort()
139 score, start, len = results[-1]
140
141 def get_prefix_func(start, end):
142 def get_prefix(triple, context):
143 if context is None:
144 yield ""
145 else:
146 yield context
147 i = start
148 while i < end:
149 yield triple[i % 3]
150 i += 1
151 yield ""
152 return get_prefix
153
154 lookup[i] = (
155 self.__indicies[start],
156 get_prefix_func(start, start + len),
157 from_key_func(start),
158 results_from_key_func(start, self._from_string))
159
160 self.__lookup_dict = lookup
161
162 self.__contexts = db.DB(db_env)
163 self.__contexts.set_flags(dbsetflags)
164 self.__contexts.open("contexts", dbname, dbtype, dbopenflags, dbmode)
165
166 self.__namespace = db.DB(db_env)
167 self.__namespace.set_flags(dbsetflags)
168 self.__namespace.open("namespace", dbname, dbtype, dbopenflags, dbmode)
169
170 self.__prefix = db.DB(db_env)
171 self.__prefix.set_flags(dbsetflags)
172 self.__prefix.open("prefix", dbname, dbtype, dbopenflags, dbmode)
173
174 self.__k2i = db.DB(db_env)
175 self.__k2i.set_flags(dbsetflags)
176 self.__k2i.open("k2i", dbname, db.DB_HASH, dbopenflags, dbmode)
177
178 self.__i2k = db.DB(db_env)
179 self.__i2k.set_flags(dbsetflags)
180 self.__i2k.open("i2k", dbname, db.DB_RECNO, dbopenflags, dbmode)
181
182 self.__needs_sync = False
183 t = Thread(target=self.__sync_run)
184 t.setDaemon(True)
185 t.start()
186 self.__sync_thread = t
187 return VALID_STORE
188
189 def __sync_run(self):
190 from time import sleep, time
191 try:
192 min_seconds, max_seconds = 10, 300
193 while self.__open:
194 if self.__needs_sync:
195 t0 = t1 = time()
196 self.__needs_sync = False
197 while self.__open:
198 sleep(.1)
199 if self.__needs_sync:
200 t1 = time()
201 self.__needs_sync = False
202 if time() - t1 > min_seconds \
203 or time() - t0 > max_seconds:
204 self.__needs_sync = False
205 logger.debug("sync")
206 self.sync()
207 break
208 else:
209 sleep(1)
210 except Exception as e:
211 logger.exception(e)
212
213 def sync(self):
214 if self.__open:
215 for i in self.__indicies:
216 i.sync()
217 self.__contexts.sync()
218 self.__namespace.sync()
219 self.__prefix.sync()
220 self.__i2k.sync()
221 self.__k2i.sync()
222
223 def close(self, commit_pending_transaction=False):
224 self.__open = False
225 self.__sync_thread.join()
226 for i in self.__indicies:
227 i.close()
228 self.__contexts.close()
229 self.__namespace.close()
230 self.__prefix.close()
231 self.__i2k.close()
232 self.__k2i.close()
233 self.db_env.close()
234
235 def add(self, triple, context, quoted=False, txn=None):
236 """\
237 Add a triple to the store of triples.
238 """
239 (subject, predicate, object) = triple
240 assert self.__open, "The Store must be open."
241 assert context != self, "Can not add triple directly to store"
242 Store.add(self, (subject, predicate, object), context, quoted)
243
244 _to_string = self._to_string
245
246 s = _to_string(subject, txn=txn)
247 p = _to_string(predicate, txn=txn)
248 o = _to_string(object, txn=txn)
249 c = _to_string(context, txn=txn)
250
251 cspo, cpos, cosp = self.__indicies
252
253 value = cspo.get(bb("%s^%s^%s^%s^" % (c, s, p, o)), txn=txn)
254 if value is None:
255 self.__contexts.put(bb(c), "", txn=txn)
256
257 contexts_value = cspo.get(
258 bb("%s^%s^%s^%s^" % ("", s, p, o)), txn=txn) or b("")
259 contexts = set(contexts_value.split(b("^")))
260 contexts.add(bb(c))
261 contexts_value = b("^").join(contexts)
262 assert contexts_value is not None
263
264 cspo.put(bb("%s^%s^%s^%s^" % (c, s, p, o)), "", txn=txn)
265 cpos.put(bb("%s^%s^%s^%s^" % (c, p, o, s)), "", txn=txn)
266 cosp.put(bb("%s^%s^%s^%s^" % (c, o, s, p)), "", txn=txn)
267 if not quoted:
268 cspo.put(bb(
269 "%s^%s^%s^%s^" % ("", s, p, o)), contexts_value, txn=txn)
270 cpos.put(bb(
271 "%s^%s^%s^%s^" % ("", p, o, s)), contexts_value, txn=txn)
272 cosp.put(bb(
273 "%s^%s^%s^%s^" % ("", o, s, p)), contexts_value, txn=txn)
274
275 self.__needs_sync = True
276
277 def __remove(self, spo, c, quoted=False, txn=None):
278 s, p, o = spo
279 cspo, cpos, cosp = self.__indicies
280 contexts_value = cspo.get(
281 b("^").join([b(""), s, p, o, b("")]), txn=txn) or b("")
282 contexts = set(contexts_value.split(b("^")))
283 contexts.discard(c)
284 contexts_value = b("^").join(contexts)
285 for i, _to_key, _from_key in self.__indicies_info:
286 i.delete(_to_key((s, p, o), c), txn=txn)
287 if not quoted:
288 if contexts_value:
289 for i, _to_key, _from_key in self.__indicies_info:
290 i.put(_to_key((s, p, o), b("")), contexts_value, txn=txn)
291 else:
292 for i, _to_key, _from_key in self.__indicies_info:
293 try:
294 i.delete(_to_key((s, p, o), b("")), txn=txn)
295 except db.DBNotFoundError:
296 pass # TODO: is it okay to ignore these?
297
298 def remove(self, spo, context, txn=None):
299 subject, predicate, object = spo
300 assert self.__open, "The Store must be open."
301 Store.remove(self, (subject, predicate, object), context)
302 _to_string = self._to_string
303
304 if context is not None:
305 if context == self:
306 context = None
307
308 if subject is not None \
309 and predicate is not None \
310 and object is not None \
311 and context is not None:
312 s = _to_string(subject, txn=txn)
313 p = _to_string(predicate, txn=txn)
314 o = _to_string(object, txn=txn)
315 c = _to_string(context, txn=txn)
316 value = self.__indicies[0].get(bb("%s^%s^%s^%s^" %
317 (c, s, p, o)), txn=txn)
318 if value is not None:
319 self.__remove((bb(s), bb(p), bb(o)), bb(c), txn=txn)
320 self.__needs_sync = True
321 else:
322 cspo, cpos, cosp = self.__indicies
323 index, prefix, from_key, results_from_key = self.__lookup(
324 (subject, predicate, object), context, txn=txn)
325
326 cursor = index.cursor(txn=txn)
327 try:
328 current = cursor.set_range(prefix)
329 needs_sync = True
330 except db.DBNotFoundError:
331 current = None
332 needs_sync = False
333 cursor.close()
334 while current:
335 key, value = current
336 cursor = index.cursor(txn=txn)
337 try:
338 cursor.set_range(key)
339 # Hack to stop 2to3 converting this to next(cursor)
340 current = getattr(cursor, 'next')()
341 except db.DBNotFoundError:
342 current = None
343 cursor.close()
344 if key.startswith(prefix):
345 c, s, p, o = from_key(key)
346 if context is None:
347 contexts_value = index.get(key, txn=txn) or b("")
348 # remove triple from all non quoted contexts
349 contexts = set(contexts_value.split(b("^")))
350 # and from the conjunctive index
351 contexts.add(b(""))
352 for c in contexts:
353 for i, _to_key, _ in self.__indicies_info:
354 i.delete(_to_key((s, p, o), c), txn=txn)
355 else:
356 self.__remove((s, p, o), c, txn=txn)
357 else:
358 break
359
360 if context is not None:
361 if subject is None and predicate is None and object is None:
362 # TODO: also if context becomes empty and not just on
363 # remove((None, None, None), c)
364 try:
365 self.__contexts.delete(
366 bb(_to_string(context, txn=txn)), txn=txn)
367 except db.DBNotFoundError:
368 pass
369
370 self.__needs_sync = needs_sync
371
372 def triples(self, spo, context=None, txn=None):
373 """A generator over all the triples matching """
374 assert self.__open, "The Store must be open."
375
376 subject, predicate, object = spo
377
378 if context is not None:
379 if context == self:
380 context = None
381
382 # _from_string = self._from_string ## UNUSED
383 index, prefix, from_key, results_from_key = self.__lookup(
384 (subject, predicate, object), context, txn=txn)
385
386 cursor = index.cursor(txn=txn)
387 try:
388 current = cursor.set_range(prefix)
389 except db.DBNotFoundError:
390 current = None
391 cursor.close()
392 while current:
393 key, value = current
394 cursor = index.cursor(txn=txn)
395 try:
396 cursor.set_range(key)
397 # Cheap hack so 2to3 doesn't convert to next(cursor)
398 current = getattr(cursor, 'next')()
399 except db.DBNotFoundError:
400 current = None
401 cursor.close()
402 if key and key.startswith(prefix):
403 contexts_value = index.get(key, txn=txn)
404 yield results_from_key(
405 key, subject, predicate, object, contexts_value)
406 else:
407 break
408
409 def __len__(self, context=None):
410 assert self.__open, "The Store must be open."
411 if context is not None:
412 if context == self:
413 context = None
414
415 if context is None:
416 prefix = b("^")
417 else:
418 prefix = bb("%s^" % self._to_string(context))
419
420 index = self.__indicies[0]
421 cursor = index.cursor()
422 current = cursor.set_range(prefix)
423 count = 0
424 while current:
425 key, value = current
426 if key.startswith(prefix):
427 count += 1
428 # Hack to stop 2to3 converting this to next(cursor)
429 current = getattr(cursor, 'next')()
430 else:
431 break
432 cursor.close()
433 return count
434
435 def bind(self, prefix, namespace):
436 prefix = prefix.encode("utf-8")
437 namespace = namespace.encode("utf-8")
438 bound_prefix = self.__prefix.get(namespace)
439 if bound_prefix:
440 self.__namespace.delete(bound_prefix)
441 self.__prefix[namespace] = prefix
442 self.__namespace[prefix] = namespace
443
444 def namespace(self, prefix):
445 prefix = prefix.encode("utf-8")
446 ns = self.__namespace.get(prefix, None)
447 if ns is not None:
448 return URIRef(ns.decode('utf-8'))
449 return None
450
451 def prefix(self, namespace):
452 namespace = namespace.encode("utf-8")
453 prefix = self.__prefix.get(namespace, None)
454 if prefix is not None:
455 return prefix.decode('utf-8')
456 return None
457
458 def namespaces(self):
459 cursor = self.__namespace.cursor()
460 results = []
461 current = cursor.first()
462 while current:
463 prefix, namespace = current
464 results.append((prefix.decode('utf-8'), namespace.decode('utf-8')))
465 # Hack to stop 2to3 converting this to next(cursor)
466 current = getattr(cursor, 'next')()
467 cursor.close()
468 for prefix, namespace in results:
469 yield prefix, URIRef(namespace)
470
471 def contexts(self, triple=None):
472 _from_string = self._from_string
473 _to_string = self._to_string
474
475 if triple:
476 s, p, o = triple
477 s = _to_string(s)
478 p = _to_string(p)
479 o = _to_string(o)
480 contexts = self.__indicies[0].get(bb(
481 "%s^%s^%s^%s^" % ("", s, p, o)))
482 if contexts:
483 for c in contexts.split(b("^")):
484 if c:
485 yield _from_string(c)
486 else:
487 index = self.__contexts
488 cursor = index.cursor()
489 current = cursor.first()
490 cursor.close()
491 while current:
492 key, value = current
493 context = _from_string(key)
494 yield context
495 cursor = index.cursor()
496 try:
497 cursor.set_range(key)
498 # Hack to stop 2to3 converting this to next(cursor)
499 current = getattr(cursor, 'next')()
500 except db.DBNotFoundError:
501 current = None
502 cursor.close()
503
504 def add_graph(self, graph):
505 self.__contexts.put(bb(self._to_string(graph)), "")
506
507 def remove_graph(self, graph):
508 self.remove((None, None, None), graph)
509
510 def _from_string(self, i):
511 k = self.__i2k.get(int(i))
512 return self._loads(k)
513
514 def _to_string(self, term, txn=None):
515 k = self._dumps(term)
516 i = self.__k2i.get(k, txn=txn)
517 if i is None:
518 # weird behavoir from bsddb not taking a txn as a keyword argument
519 # for append
520 if self.transaction_aware:
521 i = "%s" % self.__i2k.append(k, txn)
522 else:
523 i = "%s" % self.__i2k.append(k)
524
525 self.__k2i.put(k, i, txn=txn)
526 else:
527 i = i.decode()
528 return i
529
530 def __lookup(self, spo, context, txn=None):
531 subject, predicate, object = spo
532 _to_string = self._to_string
533 if context is not None:
534 context = _to_string(context, txn=txn)
535 i = 0
536 if subject is not None:
537 i += 1
538 subject = _to_string(subject, txn=txn)
539 if predicate is not None:
540 i += 2
541 predicate = _to_string(predicate, txn=txn)
542 if object is not None:
543 i += 4
544 object = _to_string(object, txn=txn)
545 index, prefix_func, from_key, results_from_key = self.__lookup_dict[i]
546 # print (subject, predicate, object), context, prefix_func, index
547 # #DEBUG
548 prefix = bb(
549 "^".join(prefix_func((subject, predicate, object), context)))
550 return index, prefix, from_key, results_from_key
551
552
553 def to_key_func(i):
554 def to_key(triple, context):
555 "Takes a string; returns key"
556 return b("^").join(
557 (context,
558 triple[i % 3],
559 triple[(i + 1) % 3],
560 triple[(i + 2) % 3], b(""))) # "" to tac on the trailing ^
561 return to_key
562
563
564 def from_key_func(i):
565 def from_key(key):
566 "Takes a key; returns string"
567 parts = key.split(b("^"))
568 return \
569 parts[0], \
570 parts[(3 - i + 0) % 3 + 1], \
571 parts[(3 - i + 1) % 3 + 1], \
572 parts[(3 - i + 2) % 3 + 1]
573 return from_key
574
575
576 def results_from_key_func(i, from_string):
577 def from_key(key, subject, predicate, object, contexts_value):
578 "Takes a key and subject, predicate, object; returns tuple for yield"
579 parts = key.split(b("^"))
580 if subject is None:
581 # TODO: i & 1: # dis assemble and/or measure to see which is faster
582 # subject is None or i & 1
583 s = from_string(parts[(3 - i + 0) % 3 + 1])
584 else:
585 s = subject
586 if predicate is None: # i & 2:
587 p = from_string(parts[(3 - i + 1) % 3 + 1])
588 else:
589 p = predicate
590 if object is None: # i & 4:
591 o = from_string(parts[(3 - i + 2) % 3 + 1])
592 else:
593 o = object
594 return (s, p, o), (
595 from_string(c) for c in contexts_value.split(b("^")) if c)
596 return from_key
597
598
599 def readable_index(i):
600 s, p, o = "?" * 3
601 if i & 1:
602 s = "s"
603 if i & 2:
604 p = "p"
605 if i & 4:
606 o = "o"
607 return "%s,%s,%s" % (s, p, o)