18.0 vanilla

This commit is contained in:
Ernad Husremovic 2025-10-03 18:06:50 +02:00
parent d72e748793
commit 0a7ae8db93
337 changed files with 399651 additions and 232598 deletions

View file

@ -5,9 +5,10 @@ import optparse
import logging
from collections import defaultdict
from psycopg2 import sql
from . import Command
from odoo.modules.registry import Registry
from odoo.tools import SQL
_logger = logging.getLogger(__name__)
@ -46,11 +47,11 @@ class Obfuscate(Command):
@_ensure_cr
def check_pwd(self, pwd):
"""If password is set, check if it's valid"""
uncypher_pwd = self.uncypher_string(sql.Identifier('value'))
uncypher_pwd = self.uncypher_string(SQL.identifier('value'), pwd)
try:
qry = sql.SQL("SELECT {uncypher_pwd} FROM ir_config_parameter WHERE key='odoo_cyph_pwd'").format(uncypher_pwd=uncypher_pwd)
self.cr.execute(qry, {'pwd': pwd})
query = SQL("SELECT %s FROM ir_config_parameter WHERE key='odoo_cyph_pwd'", uncypher_pwd)
self.cr.execute(query)
if self.cr.rowcount == 0 or (self.cr.rowcount == 1 and self.cr.fetchone()[0] == pwd):
return True
except Exception as e: # noqa: BLE001
@ -62,12 +63,12 @@ class Obfuscate(Command):
"""Unset password to cypher/uncypher datas"""
self.cr.execute("DELETE FROM ir_config_parameter WHERE key='odoo_cyph_pwd' ")
def cypher_string(self, sql_field):
def cypher_string(self, sql_field: SQL, password):
# don't double cypher fields
return sql.SQL("""CASE WHEN starts_with({field_name}, 'odoo_cyph_') THEN {field_name} ELSE 'odoo_cyph_'||encode(pgp_sym_encrypt({field_name}, %(pwd)s), 'base64') END""").format(field_name=sql_field)
return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN %(field_name)s ELSE 'odoo_cyph_'||encode(pgp_sym_encrypt(%(field_name)s, %(pwd)s), 'base64') END""", field_name=sql_field, pwd=password)
def uncypher_string(self, sql_field):
return sql.SQL("""CASE WHEN starts_with({field_name}, 'odoo_cyph_') THEN pgp_sym_decrypt(decode(substring({field_name}, 11)::text, 'base64'), %(pwd)s) ELSE {field_name} END""").format(field_name=sql_field)
def uncypher_string(self, sql_field: SQL, password):
return SQL("""CASE WHEN starts_with(%(field_name)s, 'odoo_cyph_') THEN pgp_sym_decrypt(decode(substring(%(field_name)s, 11)::text, 'base64'), %(pwd)s) ELSE %(field_name)s END""", field_name=sql_field, pwd=password)
def check_field(self, table, field):
qry = "SELECT udt_name FROM information_schema.columns WHERE table_name=%s AND column_name=%s"
@ -92,25 +93,29 @@ class Obfuscate(Command):
for field in fields:
field_type = self.check_field(table, field)
sql_field = SQL.identifier(field)
if field_type == 'string':
cypher_query = cyph_fct(sql.Identifier(field))
cypherings.append(sql.SQL('{field}={cypher}').format(field=sql.Identifier(field), cypher=cypher_query))
cypher_query = cyph_fct(sql_field, pwd)
cypherings.append(SQL('%s=%s', SQL.identifier(field), cypher_query))
elif field_type == 'json':
# List every key
# Loop on keys
# Nest the jsonb_set calls to update all values at once
# Do not create the key in json if doesn't esist
new_field_value = sql.Identifier(field)
self.cr.execute(sql.SQL('select distinct jsonb_object_keys({field}) as key from {table}').format(field=sql.Identifier(field), table=sql.Identifier(table)))
new_field_value = sql_field
self.cr.execute(SQL('select distinct jsonb_object_keys(%s) as key from %s', sql_field, SQL.identifier(table)))
keys = [k[0] for k in self.cr.fetchall()]
for key in keys:
cypher_query = cyph_fct(sql.SQL("{field}->>{key}").format(field=sql.Identifier(field), key=sql.Literal(key)))
new_field_value = sql.SQL("""jsonb_set({new_field_value}, array[{key}], to_jsonb({cypher_query})::jsonb, FALSE) """).format(new_field_value=new_field_value, key=sql.Literal(key), cypher_query=cypher_query)
cypherings.append(sql.SQL('{field}={cypher}').format(field=sql.Identifier(field), cypher=new_field_value))
cypher_query = cyph_fct(SQL("%s->>%s", sql_field, key), pwd)
new_field_value = SQL(
"""jsonb_set(%s, array[%s], to_jsonb(%s)::jsonb, FALSE)""",
new_field_value, key, cypher_query
)
cypherings.append(SQL('%s=%s', sql_field, new_field_value))
if cypherings:
query = sql.SQL("UPDATE {table} SET {fields}").format(table=sql.Identifier(table), fields=sql.SQL(',').join(cypherings))
self.cr.execute(query, {"pwd": pwd})
query = SQL("UPDATE %s SET %s", SQL.identifier(table), SQL(',').join(cypherings))
self.cr.execute(query)
if with_commit:
self.commit()
self.begin()
@ -147,7 +152,7 @@ class Obfuscate(Command):
sys.exit(parser.print_help())
try:
opt = odoo.tools.config.parse_config(cmdargs)
opt = odoo.tools.config.parse_config(cmdargs, setup_logging=True)
if not opt.pwd:
_logger.error("--pwd is required")
sys.exit("ERROR: --pwd is required")
@ -155,7 +160,7 @@ class Obfuscate(Command):
_logger.error("--allfields can only be used in unobfuscate mode")
sys.exit("ERROR: --allfields can only be used in unobfuscate mode")
self.dbname = odoo.tools.config['db_name']
self.registry = odoo.registry(self.dbname)
self.registry = Registry(self.dbname)
with self.registry.cursor() as cr:
self.cr = cr
self.begin()
@ -233,7 +238,7 @@ class Obfuscate(Command):
_logger.info("Vacuuming obfuscated tables")
for table in tables:
_logger.debug("Vacuuming table %s", table)
self.cr.execute(sql.SQL("""VACUUM FULL {table}""").format(table=sql.Identifier(table)))
self.cr.execute(SQL("VACUUM FULL %s", SQL.identifier(table)))
self.clear_pwd()
else:
_logger.info("Obfuscating datas")