2020-12-30 06:48:59 +00:00
|
|
|
from typing import Union
|
|
|
|
from enum import Enum, auto
|
|
|
|
import dbm
|
|
|
|
|
2020-12-31 03:25:05 +00:00
|
|
|
from .securestring import generate_key_file, protect_string, unprotect_string
|
|
|
|
|
2020-12-30 06:48:59 +00:00
|
|
|
|
|
|
|
|
|
|
|
class SafeDB:
|
2020-12-31 03:25:05 +00:00
|
|
|
"""Wrapper around dbm to optionally encrypt db values."""
|
|
|
|
|
|
|
|
def get(self, key: Union[str, bytes, bytearray]) -> bytes:
|
|
|
|
if self.protected:
|
|
|
|
return self.db_conn[key]
|
|
|
|
return unprotect_string(self.db_conn[key])
|
2020-12-30 06:48:59 +00:00
|
|
|
|
2020-12-31 03:25:05 +00:00
|
|
|
def put(
|
|
|
|
self, key: [str, bytes, bytearray], value: [bytes, bytearray]):
|
|
|
|
if self.protected:
|
|
|
|
self.db_conn[key] = protect_string(value)
|
|
|
|
else:
|
|
|
|
self.db_conn[key] = value
|
2020-12-30 06:48:59 +00:00
|
|
|
|
2020-12-31 03:25:05 +00:00
|
|
|
def close(self):
|
|
|
|
self.db_conn.close()
|
2020-12-30 06:48:59 +00:00
|
|
|
|
2020-12-31 03:25:05 +00:00
|
|
|
def __init__(self, db_path: str, protected=True):
|
2020-12-30 06:48:59 +00:00
|
|
|
self.db_path = db_path
|
2020-12-31 03:25:05 +00:00
|
|
|
self.db_conn = dbm.open(db_path, "c")
|
|
|
|
|
|
|
|
try:
|
|
|
|
existing_protected_mode = self.db_conn['enc']
|
|
|
|
if protected and existing_protected_mode != b'1':
|
|
|
|
raise ValueError(
|
|
|
|
"Cannot open unencrypted database with protected=True")
|
|
|
|
elif not protected and existing_protected_mode != b'0':
|
|
|
|
raise ValueError(
|
|
|
|
"Cannot open encrypted database in protected=False")
|
|
|
|
except KeyError:
|
|
|
|
if protected:
|
|
|
|
self.db_conn['enc'] = b'1'
|
|
|
|
else:
|
|
|
|
self.db_conn['enc'] = b'0'
|
|
|
|
try:
|
|
|
|
generate_key_file()
|
|
|
|
except FileExistsError:
|
|
|
|
pass
|
|
|
|
|
|
|
|
self.protected = protected
|
2020-12-30 06:48:59 +00:00
|
|
|
|
|
|
|
|