Initial commit: OCA Storage packages (17 packages)

This commit is contained in:
Ernad Husremovic 2025-08-29 15:43:06 +02:00
commit 7a380f05d3
659 changed files with 41828 additions and 0 deletions

View file

@ -0,0 +1,5 @@
from . import test_fs_attachment
from . import test_fs_attachment_file_like_adapter
from . import test_fs_attachment_internal_url
from . import test_fs_storage
from . import test_stream

View file

@ -0,0 +1,74 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import os
import shutil
import tempfile
from odoo.tests.common import TransactionCase
class TestFSAttachmentCommon(TransactionCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True))
temp_dir = tempfile.mkdtemp()
cls.temp_backend = cls.env["fs.storage"].create(
{
"name": "Temp FS Storage",
"protocol": "file",
"code": "tmp_dir",
"directory_path": temp_dir,
}
)
cls.backend_optimized = cls.env["fs.storage"].create(
{
"name": "Temp Optimized FS Storage",
"protocol": "file",
"code": "tmp_opt",
"directory_path": temp_dir,
"optimizes_directory_path": True,
}
)
cls.temp_dir = temp_dir
cls.gc_file_model = cls.env["fs.file.gc"]
cls.ir_attachment_model = cls.env["ir.attachment"]
@cls.addClassCleanup
def cleanup_tempdir():
shutil.rmtree(temp_dir)
def setUp(self):
super().setUp()
# enforce temp_backend field since it seems that they are reset on
# savepoint rollback when managed by server_environment -> TO Be investigated
self.temp_backend.write(
{
"protocol": "file",
"code": "tmp_dir",
"directory_path": self.temp_dir,
}
)
self.backend_optimized.write(
{
"protocol": "file",
"code": "tmp_opt",
"directory_path": self.temp_dir,
"optimizes_directory_path": True,
}
)
def tearDown(self) -> None:
super().tearDown()
# empty the temp dir
for f in os.listdir(self.temp_dir):
full_path = os.path.join(self.temp_dir, f)
if os.path.isfile(full_path):
os.remove(full_path)
else: # using optimizes_directory_path, we'll have a directory
shutil.rmtree(full_path)
class MyException(Exception):
"""Exception to be raised into tests ensure that we trap only this
exception and not other exceptions raised by the test"""

View file

@ -0,0 +1,487 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import os
from unittest import mock
from odoo.tools import mute_logger
from .common import MyException, TestFSAttachmentCommon
class TestFSAttachment(TestFSAttachmentCommon):
def test_create_attachment_explicit_location(self):
content = b"This is a test attachment"
attachment = (
self.env["ir.attachment"]
.with_context(
storage_location=self.temp_backend.code,
force_storage_key="test.txt",
)
.create({"name": "test.txt", "raw": content})
)
self.assertEqual(os.listdir(self.temp_dir), [f"test-{attachment.id}-0.txt"])
self.assertEqual(attachment.raw, content)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.mimetype, "text/plain")
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with attachment.open("wb") as f:
f.write(b"new")
self.assertEqual(attachment.raw, b"new")
def test_create_attachment_with_meaningful_name(self):
"""In this test we use a backend with 'optimizes_directory_path',
which rewrites the filename to be a meaningful name.
We ensure that the rewritten path is consistently used,
meaning we can read the file after.
"""
content = b"This is a test attachment"
attachment = (
self.env["ir.attachment"]
.with_context(
storage_location=self.backend_optimized.code,
force_storage_key="test.txt",
)
.create({"name": "test.txt", "raw": content})
)
# the expected store_fname is made of the storage code,
# a random middle part, and the filename
# example: tmp_opt://te/st/test-198-0.txt
# The storage root is NOT part of the store_fname
self.assertFalse("tmp/" in attachment.store_fname)
# remove protocol and file name to keep the middle part
sub_path = os.path.dirname(attachment.store_fname.split("://")[1])
# the subpath is consistently 'te/st' because the file storage key is forced
# if it's arbitrary we might get a random name (3fbc5er....txt), in which case
# the middle part would also be 'random', in our example 3f/bc
self.assertEqual(sub_path, "te/st")
# we can read the file, so storage finds it correctly
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
new_content = b"new content"
with attachment.open("wb") as f:
f.write(new_content)
# the store fname should have changed, as its version number has increased
# e.g. tmp_opt://te/st/test-1766-0.txt to tmp_opt://te/st/test-1766-1.txt
# but the protocol and sub path should be the same
new_sub_path = os.path.dirname(attachment.store_fname.split("://")[1])
self.assertEqual(sub_path, new_sub_path)
with attachment.open("rb") as f:
self.assertEqual(f.read(), new_content)
def test_open_attachment_in_db(self):
self.env["ir.config_parameter"].sudo().set_param("ir_attachment.location", "db")
content = b"This is a test attachment in db"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.assertFalse(attachment.store_fname)
self.assertTrue(attachment.db_datas)
self.assertEqual(attachment.mimetype, "text/plain")
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with attachment.open("wb") as f:
f.write(b"new")
self.assertEqual(attachment.raw, b"new")
def test_attachment_open_in_filestore(self):
self.env["ir.config_parameter"].sudo().set_param(
"ir_attachment.location", "file"
)
content = b"This is a test attachment in filestore"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with attachment.open("wb") as f:
f.write(b"new")
self.assertEqual(attachment.raw, b"new")
def test_default_attachment_store_in_fs(self):
self.temp_backend.use_as_default_for_attachments = True
content = b"This is a test attachment in filestore tmp_dir"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(os.listdir(self.temp_dir), [initial_filename])
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with open(os.path.join(self.temp_dir, initial_filename), "rb") as f:
self.assertEqual(f.read(), content)
# update the attachment
attachment.raw = b"new"
with attachment.open("rb") as f:
self.assertEqual(f.read(), b"new")
# a new file version is created
new_filename = f"test-{attachment.id}-1.txt"
with open(os.path.join(self.temp_dir, new_filename), "rb") as f:
self.assertEqual(f.read(), b"new")
self.assertEqual(attachment.raw, b"new")
self.assertEqual(attachment.store_fname, f"tmp_dir://{new_filename}")
self.assertEqual(attachment.mimetype, "text/plain")
# the original file is to to be deleted by the GC
self.assertEqual(
set(os.listdir(self.temp_dir)), {initial_filename, new_filename}
)
# run the GC
self.env.flush_all()
self.gc_file_model._gc_files_unsafe()
self.assertEqual(os.listdir(self.temp_dir), [new_filename])
attachment.unlink()
# concrete file deletion is done by the GC
self.env.flush_all()
self.assertEqual(os.listdir(self.temp_dir), [new_filename])
# run the GC
self.gc_file_model._gc_files_unsafe()
self.assertEqual(os.listdir(self.temp_dir), [])
def test_fs_update_transactionnal(self):
"""In this test we check that if a rollback is done on an update
The original content is preserved
"""
self.temp_backend.use_as_default_for_attachments = True
content = b"Transactional update"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.env.flush_all()
self.assertEqual(attachment.raw, content)
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(attachment.store_fname, f"tmp_dir://{initial_filename}")
self.assertEqual(attachment.fs_filename, initial_filename)
self.assertEqual(
os.listdir(self.temp_dir), [os.path.basename(initial_filename)]
)
orignal_store_fname = attachment.store_fname
try:
with self.env.cr.savepoint():
attachment.raw = b"updated"
new_filename = f"test-{attachment.id}-1.txt"
new_store_fname = f"tmp_dir://{new_filename}"
self.assertEqual(attachment.store_fname, new_store_fname)
self.assertEqual(attachment.fs_filename, new_filename)
# at this stage the original file and the new file are present
# in the list of files to GC
gc_files = self.gc_file_model.search([]).mapped("store_fname")
self.assertIn(orignal_store_fname, gc_files)
self.assertIn(orignal_store_fname, gc_files)
raise MyException("dummy exception")
except MyException:
...
self.assertEqual(attachment.store_fname, f"tmp_dir://{initial_filename}")
self.assertEqual(attachment.fs_filename, initial_filename)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.assertEqual(
set(os.listdir(self.temp_dir)),
{os.path.basename(initial_filename), os.path.basename(new_filename)},
)
# in test mode, gc collector is not run into a separate transaction
# therefore it has been reset. We manually add our two store_fname
# to the list of files to GC
self.gc_file_model._mark_for_gc(orignal_store_fname)
self.gc_file_model._mark_for_gc(new_store_fname)
# run gc
self.gc_file_model._gc_files_unsafe()
self.assertEqual(
os.listdir(self.temp_dir), [os.path.basename(initial_filename)]
)
def test_fs_create_transactional(self):
"""In this test we check that if a rollback is done on a create
The file is removed
"""
self.temp_backend.use_as_default_for_attachments = True
content = b"Transactional create"
try:
with self.env.cr.savepoint():
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.env.flush_all()
self.assertEqual(attachment.raw, content)
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(
attachment.store_fname, f"tmp_dir://{initial_filename}"
)
self.assertEqual(attachment.fs_filename, initial_filename)
self.assertEqual(
os.listdir(self.temp_dir), [os.path.basename(initial_filename)]
)
new_store_fname = attachment.store_fname
# at this stage the new file is into the list of files to GC
gc_files = self.gc_file_model.search([]).mapped("store_fname")
self.assertIn(new_store_fname, gc_files)
raise MyException("dummy exception")
except MyException:
...
self.env.flush_all()
# in test mode, gc collector is not run into a separate transaction
# therefore it has been reset. We manually add our new file to the
# list of files to GC
self.gc_file_model._mark_for_gc(new_store_fname)
# run gc
self.gc_file_model._gc_files_unsafe()
self.assertEqual(os.listdir(self.temp_dir), [])
def test_fs_no_delete_if_not_in_current_directory_path(self):
"""In this test we check that it's not possible to removes files
outside the current directory path even if they were created by the
current filesystem storage.
"""
# normal delete
self.temp_backend.use_as_default_for_attachments = True
content = b"Transactional create"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.env.flush_all()
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(
os.listdir(self.temp_dir), [os.path.basename(initial_filename)]
)
attachment.unlink()
self.gc_file_model._gc_files_unsafe()
self.assertEqual(os.listdir(self.temp_dir), [])
# delete outside the current directory path
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.env.flush_all()
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(
os.listdir(self.temp_dir), [os.path.basename(initial_filename)]
)
self.temp_backend.directory_path = "/dummy"
attachment.unlink()
self.gc_file_model._gc_files_unsafe()
# unlink is not physically done since the file is outside the current
self.assertEqual(
os.listdir(self.temp_dir), [os.path.basename(initial_filename)]
)
def test_no_gc_if_disabled_on_storage(self):
store_fname = "tmp_dir://dummy-0-0.txt"
self.gc_file_model._mark_for_gc(store_fname)
self.temp_backend.autovacuum_gc = False
self.gc_file_model._gc_files_unsafe()
self.assertIn(store_fname, self.gc_file_model.search([]).mapped("store_fname"))
self.temp_backend.autovacuum_gc = False
self.gc_file_model._gc_files_unsafe()
self.assertIn(store_fname, self.gc_file_model.search([]).mapped("store_fname"))
self.temp_backend.autovacuum_gc = True
self.gc_file_model._gc_files_unsafe()
self.assertNotIn(
store_fname, self.gc_file_model.search([]).mapped("store_fname")
)
def test_attachment_fs_url(self):
self.temp_backend.base_url = "https://acsone.eu/media"
self.temp_backend.use_as_default_for_attachments = True
content = b"Transactional update"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content}
)
self.env.flush_all()
attachment_path = f"/test-{attachment.id}-0.txt"
self.assertEqual(attachment.fs_url, f"https://acsone.eu/media{attachment_path}")
self.assertEqual(attachment.fs_url_path, attachment_path)
self.temp_backend.is_directory_path_in_url = True
self.temp_backend.recompute_urls()
attachment_path = f"{self.temp_dir}/test-{attachment.id}-0.txt"
self.assertEqual(attachment.fs_url, f"https://acsone.eu/media{attachment_path}")
self.assertEqual(attachment.fs_url_path, attachment_path)
def test_force_attachment_in_db_rules(self):
self.temp_backend.use_as_default_for_attachments = True
# force storage in db for text/plain
self.temp_backend.force_db_for_default_attachment_rules = '{"text/plain": 0}'
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": b"content"}
)
self.env.flush_all()
self.assertFalse(attachment.store_fname)
self.assertEqual(attachment.db_datas, b"content")
self.assertEqual(attachment.mimetype, "text/plain")
def test_force_storage_to_db(self):
self.temp_backend.use_as_default_for_attachments = True
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": b"content"}
)
self.env.flush_all()
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
store_fname = attachment.store_fname
# we change the rules to force the storage in db for text/plain
self.temp_backend.force_db_for_default_attachment_rules = '{"text/plain": 0}'
attachment.force_storage_to_db_for_special_fields()
self.assertFalse(attachment.store_fname)
self.assertEqual(attachment.db_datas, b"content")
# we check that the file is marked for GC
gc_files = self.gc_file_model.search([]).mapped("store_fname")
self.assertIn(store_fname, gc_files)
@mute_logger("odoo.addons.fs_attachment.models.ir_attachment")
def test_force_storage_to_fs(self):
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": b"content"}
)
self.env.flush_all()
fs_path = self.ir_attachment_model._filestore() + "/" + attachment.store_fname
self.assertTrue(os.path.exists(fs_path))
self.assertEqual(os.listdir(self.temp_dir), [])
# we decide to force the storage in the filestore
self.temp_backend.use_as_default_for_attachments = True
with mock.patch.object(self.env.cr, "commit"), mock.patch(
"odoo.addons.fs_attachment.models.ir_attachment.clean_fs"
) as clean_fs:
self.ir_attachment_model.force_storage()
clean_fs.assert_called_once()
# files into the filestore must be moved to our filesystem storage
filename = f"test-{attachment.id}-0.txt"
self.assertEqual(attachment.store_fname, f"tmp_dir://{filename}")
self.assertIn(filename, os.listdir(self.temp_dir))
def test_storage_use_filename_obfuscation(self):
self.temp_backend.base_url = "https://acsone.eu/media"
self.temp_backend.use_as_default_for_attachments = True
self.temp_backend.use_filename_obfuscation = True
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": b"content"}
)
self.env.flush_all()
self.assertTrue(attachment.store_fname)
self.assertEqual(attachment.name, "test.txt")
self.assertEqual(attachment.checksum, attachment.store_fname.split("/")[-1])
self.assertEqual(attachment.checksum, attachment.fs_url.split("/")[-1])
self.assertEqual(attachment.mimetype, "text/plain")
def test_create_attachments_basic_user(self):
demo_user = self.env.ref("base.user_demo")
demo_partner = self.env.ref("base.partner_demo")
self.temp_backend.use_as_default_for_attachments = True
# Ensure basic access
group_user = self.env.ref("base.group_user")
group_partner_manager = self.env.ref("base.group_partner_manager")
demo_user.write(
{"groups_id": [(6, 0, [group_user.id, group_partner_manager.id])]}
)
# Create basic attachment
self.ir_attachment_model.with_user(demo_user).create(
{"name": "test.txt", "raw": b"content"}
)
# Create attachment related to model
self.ir_attachment_model.with_user(demo_user).create(
{
"name": "test.txt",
"raw": b"content",
"res_model": "res.partner",
"res_id": demo_partner.id,
}
)
# Create attachment related to field
partner_image_field = self.env["ir.model.fields"].search(
[("model", "=", "res.partner"), ("name", "=", "image1920")]
)
self.ir_attachment_model.with_user(demo_user).create(
{
"name": "test.txt",
"raw": b"content",
"res_model": "res.partner",
"res_id": demo_partner.id,
"res_field": partner_image_field.name,
}
)
def test_update_png_to_svg(self):
b64_data_png = (
b"iVBORw0KGgoAAAANSUhEUgAAADMAAAAhCAIAAAD73QTtAAAAA3NCSVQICAjb4U/gAA"
b"AAP0lEQVRYhe3OMQGAMBAAsVL/nh8FDDfxQ6Igz8ycle7fgU9mnVln1pl1Zp1ZZ9aZd"
b"WadWWfWmXVmnVln1u2dvfL/Az+TRcv4AAAAAElFTkSuQmCC"
)
attachment = self.ir_attachment_model.create(
{
"name": "test.png",
"datas": b64_data_png,
}
)
self.assertEqual(attachment.mimetype, "image/png")
b64_data_svg = (
b"PD94bWwgdmVyc2lvbj0iMS4wIiBzdGFuZGFsb25lPSJubyI/Pgo8IURPQ1RZUEU"
b"gc3ZnIFBVQkxJQyAiLS8vVzNDLy9EVEQgU1ZHIDIwMDEwOTA0Ly9FTiIKICJodH"
b"RwOi8vd3d3LnczLm9yZy9UUi8yMDAxL1JFQy1TVkctMjAwMTA5MDQvRFREL3N2Zz"
b"EwLmR0ZCI+CjxzdmcgdmVyc2lvbj0iMS4wIiB4bWxucz0iaHR0cDovL3d3dy53My5"
b"vcmcvMjAwMC9zdmciCiB3aWR0aD0iNTEuMDAwMDAwcHQiIGhlaWdodD0iMzMuMDAw"
b"MDAwcHQiIHZpZXdCb3g9IjAgMCA1MS4wMDAwMDAgMzMuMDAwMDAwIgogcHJlc2Vydm"
b"VBc3BlY3RSYXRpbz0ieE1pZFlNaWQgbWVldCI+Cgo8ZyB0cmFuc2Zvcm09InRyYW5z"
b"bGF0ZSgwLjAwMDAwMCwzMy4wMDAwMDApIHNjYWxlKDAuMTAwMDAwLC0wLjEwMDAwMCk"
b"iCmZpbGw9IiMwMDAwMDAiIHN0cm9rZT0ibm9uZSI+CjwvZz4KPC9zdmc+Cg=="
)
attachment.write(
{
"datas": b64_data_svg,
}
)
self.assertEqual(attachment.mimetype, "image/svg+xml")
def test_write_name(self):
self.temp_backend.use_as_default_for_attachments = True
attachment = self.ir_attachment_model.create(
{"name": "file.bin", "datas": b"aGVsbG8gd29ybGQK"}
)
self.assertTrue(attachment.fs_filename.startswith("file-"))
self.assertTrue(attachment.fs_filename.endswith(".bin"))
attachment.write({"name": "file2.txt"})
self.assertTrue(attachment.fs_filename.startswith("file2-"))
self.assertTrue(attachment.fs_filename.endswith(".txt"))
def test_store_in_db_instead_of_object_storage_domain(self):
IrAttachment = self.env["ir.attachment"]
self.patch(
type(IrAttachment),
"_get_storage_force_db_config",
lambda self: {"text/plain": 0, "image/png": 100},
)
self.assertEqual(
self.env["ir.attachment"]._store_in_db_instead_of_object_storage_domain(),
[
"|",
("mimetype", "=like", "text/plain%"),
"&",
("mimetype", "=like", "image/png%"),
("file_size", "<=", 100),
],
)

View file

@ -0,0 +1,233 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
from ..models.ir_attachment import AttachmentFileLikeAdapter
from .common import MyException, TestFSAttachmentCommon
class TestFSAttachmentFileLikeAdapterMixin:
@classmethod
def _create_attachment(cls):
raise NotImplementedError
@classmethod
def prepareClass(cls):
cls.initial_content = b"This is a test attachment"
cls.new_content = b"This is a new test attachment"
def prepare(self):
self.attachment = self._create_attachment()
def open(self, attachment=None, mode="rb", new_version=False, **kwargs):
return AttachmentFileLikeAdapter(
attachment or self.attachment,
mode=mode,
new_version=new_version,
**kwargs,
)
def test_read(self):
with self.open(mode="rb") as f:
self.assertEqual(f.read(), self.initial_content)
def test_write(self):
with self.open(mode="wb") as f:
f.write(self.new_content)
self.assertEqual(self.new_content, self.attachment.raw)
def test_write_append(self):
self.assertEqual(self.initial_content, self.attachment.raw)
with self.open(mode="ab") as f:
f.write(self.new_content)
self.assertEqual(self.initial_content + self.new_content, self.attachment.raw)
def test_write_new_version(self):
initial_fname = self.attachment.store_fname
with self.open(mode="wb", new_version=True) as f:
f.write(self.new_content)
self.assertEqual(self.new_content, self.attachment.raw)
if initial_fname:
self.assertNotEqual(self.attachment.store_fname, initial_fname)
def test_write_append_new_version(self):
initial_fname = self.attachment.store_fname
with self.open(mode="ab", new_version=True) as f:
f.write(self.new_content)
self.assertEqual(self.initial_content + self.new_content, self.attachment.raw)
if initial_fname:
self.assertNotEqual(self.attachment.store_fname, initial_fname)
def test_write_transactional_new_version_only(self):
try:
initial_fname = self.attachment.store_fname
with self.env.cr.savepoint():
with self.open(mode="wb", new_version=True) as f:
f.write(self.new_content)
self.assertEqual(self.new_content, self.attachment.raw)
if initial_fname:
self.assertNotEqual(self.attachment.store_fname, initial_fname)
raise MyException("Test")
except MyException:
...
self.assertEqual(self.initial_content, self.attachment.raw)
if initial_fname:
self.assertEqual(self.attachment.store_fname, initial_fname)
class TestAttachmentInFileSystemFileLikeAdapter(
TestFSAttachmentCommon, TestFSAttachmentFileLikeAdapterMixin
):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.prepareClass()
def setUp(self):
super().setUp()
self.prepare()
@classmethod
def _create_attachment(cls):
return (
cls.env["ir.attachment"]
.with_context(
storage_location=cls.temp_backend.code,
storage_file_path="test.txt",
)
.create({"name": "test.txt", "raw": cls.initial_content})
)
class TestAttachmentInDBFileLikeAdapter(
TestFSAttachmentCommon, TestFSAttachmentFileLikeAdapterMixin
):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.prepareClass()
def setUp(self):
super().setUp()
self.env["ir.config_parameter"].sudo().set_param("ir_attachment.location", "db")
self.prepare()
def tearDown(self) -> None:
self.attachment.unlink()
super().tearDown()
@classmethod
def _create_attachment(cls):
return cls.env["ir.attachment"].create(
{"name": "test.txt", "raw": cls.initial_content}
)
class TestAttachmentInFileFileLikeAdapter(
TestFSAttachmentCommon, TestFSAttachmentFileLikeAdapterMixin
):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.prepareClass()
def setUp(self):
super().setUp()
self.env["ir.config_parameter"].sudo().set_param(
"ir_attachment.location", "file"
)
self.prepare()
def tearDown(self) -> None:
self.attachment.unlink()
self.attachment._gc_file_store_unsafe()
super().tearDown()
@classmethod
def _create_attachment(cls):
return cls.env["ir.attachment"].create(
{"name": "test.txt", "raw": cls.initial_content}
)
class TestAttachmentInFileSystemDependingModelFileLikeAdapter(
TestFSAttachmentCommon, TestFSAttachmentFileLikeAdapterMixin
):
"""
Configure the temp backend to store only attachments linked to
res.partner model.
Check that opening/updating the file does not change the storage type.
"""
@classmethod
def setUpClass(cls):
res = super().setUpClass()
cls.temp_backend.model_xmlids = "base.model_res_partner"
cls.prepareClass()
return res
def setUp(self):
super().setUp()
super().prepare()
@classmethod
def _create_attachment(cls):
return (
cls.env["ir.attachment"]
.with_context(
storage_file_path="test.txt",
)
.create(
{
"name": "test.txt",
"raw": cls.initial_content,
"res_model": "res.partner",
}
)
)
def test_storage_location(self):
self.assertEqual(self.attachment.fs_storage_id, self.temp_backend)
class TestAttachmentInFileSystemDependingFieldFileLikeAdapter(
TestFSAttachmentCommon, TestFSAttachmentFileLikeAdapterMixin
):
"""
Configure the temp backend to store only attachments linked to
res.country ID field.
Check that opening/updating the file does not change the storage type.
"""
@classmethod
def setUpClass(cls):
res = super().setUpClass()
cls.temp_backend.field_xmlids = "base.field_res_country__id"
cls.prepareClass()
return res
def setUp(self):
super().setUp()
super().prepare()
@classmethod
def _create_attachment(cls):
return (
cls.env["ir.attachment"]
.with_context(
storage_file_path="test.txt",
)
.create(
{
"name": "test.txt",
"raw": cls.initial_content,
"res_model": "res.country",
"res_field": "id",
}
)
)
def test_storage_location(self):
self.assertEqual(self.attachment.fs_storage_id, self.temp_backend)

View file

@ -0,0 +1,108 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import os
import shutil
import tempfile
from unittest.mock import patch
from odoo.tests.common import HttpCase
from odoo.tools import config
class TestFsAttachmentInternalUrl(HttpCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True))
temp_dir = tempfile.mkdtemp()
cls.temp_backend = cls.env["fs.storage"].create(
{
"name": "Temp FS Storage",
"protocol": "file",
"code": "tmp_dir",
"directory_path": temp_dir,
"base_url": "http://my.public.files/",
}
)
cls.temp_dir = temp_dir
cls.gc_file_model = cls.env["fs.file.gc"]
cls.content = b"This is a test attachment"
cls.attachment = (
cls.env["ir.attachment"]
.with_context(
storage_location=cls.temp_backend.code,
storage_file_path="test.txt",
)
.create({"name": "test.txt", "raw": cls.content})
)
@cls.addClassCleanup
def cleanup_tempdir():
shutil.rmtree(temp_dir)
def setUp(self):
super().setUp()
# enforce temp_backend field since it seems that they are reset on
# savepoint rollback when managed by server_environment -> TO Be investigated
self.temp_backend.write(
{
"protocol": "file",
"code": "tmp_dir",
"directory_path": self.temp_dir,
"base_url": "http://my.public.files/",
}
)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
for f in os.listdir(cls.temp_dir):
os.remove(os.path.join(cls.temp_dir, f))
def assertDownload(
self, url, headers, assert_status_code, assert_headers, assert_content=None
):
res = self.url_open(url, headers=headers)
res.raise_for_status()
self.assertEqual(res.status_code, assert_status_code)
for header_name, header_value in assert_headers.items():
self.assertEqual(
res.headers.get(header_name),
header_value,
f"Wrong value for header {header_name}",
)
if assert_content:
self.assertEqual(res.content, assert_content, "Wong content")
return res
def test_fs_attachment_internal_url(self):
self.authenticate("admin", "admin")
self.assertDownload(
self.attachment.internal_url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "text/plain; charset=utf-8",
"Content-Disposition": "inline; filename=test.txt",
},
assert_content=self.content,
)
def test_fs_attachment_internal_url_x_sendfile(self):
self.authenticate("admin", "admin")
self.temp_backend.write({"use_x_sendfile_to_serve_internal_url": True})
with patch.object(config, "options", {**config.options, "x_sendfile": True}):
x_accel_redirect = f"/tmp_dir/test-{self.attachment.id}-0.txt"
self.assertDownload(
self.attachment.internal_url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "text/plain; charset=utf-8",
"Content-Disposition": "inline; filename=test.txt",
"X-Accel-Redirect": x_accel_redirect,
"Content-Length": "0",
"X-Sendfile": x_accel_redirect,
},
assert_content=None,
)

View file

@ -0,0 +1,414 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import base64
import os
from odoo.exceptions import ValidationError
from .common import TestFSAttachmentCommon
class TestFsStorage(TestFSAttachmentCommon):
@classmethod
def setUpClass(cls):
res = super().setUpClass()
cls.default_backend = cls.env.ref("fs_storage.default_fs_storage")
return res
def test_compute_model_ids(self):
"""
Give a list of model xmlids and check that the o2m field model_ids
is correctly fulfilled.
"""
self.temp_backend.model_xmlids = (
"base.model_res_partner,base.model_ir_attachment"
)
model_ids = self.temp_backend.model_ids
self.assertEqual(len(model_ids), 2)
model_names = model_ids.mapped("model")
self.assertEqual(set(model_names), {"res.partner", "ir.attachment"})
def test_inverse_model_ids(self):
"""
Modify backend model_ids and check the char field model_xmlids
is correctly updated
"""
model_1 = self.env["ir.model"].search([("model", "=", "res.partner")])
model_2 = self.env["ir.model"].search([("model", "=", "ir.attachment")])
self.temp_backend.model_ids = [(6, 0, [model_1.id, model_2.id])]
self.assertEqual(
self.temp_backend.model_xmlids,
"base.model_res_partner,base.model_ir_attachment",
)
def test_compute_field_ids(self):
"""
Give a list of field xmlids and check that the o2m field field_ids
is correctly fulfilled.
"""
self.temp_backend.field_xmlids = (
"base.field_res_partner__id,base.field_res_partner__create_date"
)
field_ids = self.temp_backend.field_ids
self.assertEqual(len(field_ids), 2)
field_names = field_ids.mapped("name")
self.assertEqual(set(field_names), {"id", "create_date"})
field_models = field_ids.mapped("model")
self.assertEqual(set(field_models), {"res.partner"})
def test_inverse_field_ids(self):
"""
Modify backend field_ids and check the char field field_xmlids
is correctly updated
"""
field_1 = self.env["ir.model.fields"].search(
[("model", "=", "res.partner"), ("name", "=", "id")]
)
field_2 = self.env["ir.model.fields"].search(
[("model", "=", "res.partner"), ("name", "=", "create_date")]
)
self.temp_backend.field_ids = [(6, 0, [field_1.id, field_2.id])]
self.assertEqual(
self.temp_backend.field_xmlids,
"base.field_res_partner__id,base.field_res_partner__create_date",
)
def test_constraint_unique_storage_model(self):
"""
A given model can be linked to a unique storage
"""
self.temp_backend.model_xmlids = (
"base.model_res_partner,base.model_ir_attachment"
)
self.env.ref("fs_storage.default_fs_storage")
with self.assertRaises(ValidationError):
self.default_backend.model_xmlids = "base.model_res_partner"
def test_constraint_unique_storage_field(self):
"""
A given field can be linked to a unique storage
"""
self.temp_backend.field_xmlids = (
"base.field_res_partner__id,base.field_res_partner__name"
)
with self.assertRaises(ValidationError):
self.default_backend.field_xmlids = "base.field_res_partner__name"
def test_force_model_create_attachment(self):
"""
Force 'res.partner' model to temp_backend
Use odoofs as default for attachments
* Check that only attachments linked to res.partner model are stored
in the first FS.
* Check that updating this first attachment does not change the storage
"""
self.default_backend.use_as_default_for_attachments = True
self.temp_backend.model_xmlids = "base.model_res_partner"
# 1a. First attachment linked to res.partner model
content = b"This is a test attachment linked to res.partner model"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content, "res_model": "res.partner"}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(attachment.fs_storage_code, self.temp_backend.code)
self.assertEqual(os.listdir(self.temp_dir), [initial_filename])
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with open(os.path.join(self.temp_dir, initial_filename), "rb") as f:
self.assertEqual(f.read(), content)
# 1b. Update the attachment
new_content = b"Update the test attachment"
attachment.raw = new_content
with attachment.open("rb") as f:
self.assertEqual(f.read(), new_content)
# a new file version is created
new_filename = f"test-{attachment.id}-1.txt"
with open(os.path.join(self.temp_dir, new_filename), "rb") as f:
self.assertEqual(f.read(), new_content)
self.assertEqual(attachment.raw, new_content)
self.assertEqual(attachment.store_fname, f"tmp_dir://{new_filename}")
# 2. Second attachment linked to res.country model
content = b"This is a test attachment linked to res.country model"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content, "res_model": "res.country"}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
self.assertEqual(attachment.fs_storage_code, self.default_backend.code)
def test_force_field_create_attachment(self):
"""
Force 'base.field_res.partner__name' field to temp_backend
Use odoofs as default for attachments
* Check that only attachments linked to res.partner name field are stored
in the first FS.
* Check that updating this first attachment does not change the storage
"""
self.default_backend.use_as_default_for_attachments = True
self.temp_backend.field_xmlids = "base.field_res_partner__name"
# 1a. First attachment linked to res.partner name field
content = b"This is a test attachment linked to res.partner name field"
attachment = self.ir_attachment_model.create(
{
"name": "test.txt",
"raw": content,
"res_model": "res.partner",
"res_field": "name",
}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(attachment.fs_storage_code, self.temp_backend.code)
self.assertEqual(os.listdir(self.temp_dir), [initial_filename])
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with open(os.path.join(self.temp_dir, initial_filename), "rb") as f:
self.assertEqual(f.read(), content)
# 1b. Update the attachment
new_content = b"Update the test attachment"
attachment.raw = new_content
with attachment.open("rb") as f:
self.assertEqual(f.read(), new_content)
# a new file version is created
new_filename = f"test-{attachment.id}-1.txt"
with open(os.path.join(self.temp_dir, new_filename), "rb") as f:
self.assertEqual(f.read(), new_content)
self.assertEqual(attachment.raw, new_content)
self.assertEqual(attachment.store_fname, f"tmp_dir://{new_filename}")
# 2. Second attachment linked to res.partner but other field (website)
content = b"This is a test attachment linked to res.partner website field"
attachment = self.ir_attachment_model.create(
{
"name": "test.txt",
"raw": content,
"res_model": "res.partner",
"res_field": "website",
}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
self.assertEqual(attachment.fs_storage_code, self.default_backend.code)
# 3. Third attachment linked to res.partner but no specific field
content = b"This is a test attachment linked to res.partner model"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content, "res_model": "res.partner"}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
self.assertEqual(attachment.fs_storage_code, self.default_backend.code)
def test_force_field_and_model_create_attachment(self):
"""
Force res.partner model to default_backend.
But force specific res.partner name field to temp_backend.
* Check that attachments linked to res.partner name field are
stored in temp_backend, and other attachments linked to other
fields of res.partner are stored in default_backend
* Check that updating this first attachment does not change the storage
"""
self.default_backend.model_xmlids = "base.model_res_partner"
self.temp_backend.field_xmlids = "base.field_res_partner__name"
# 1a. First attachment linked to res.partner name field
content = b"This is a test attachment linked to res.partner name field"
attachment = self.ir_attachment_model.create(
{
"name": "test.txt",
"raw": content,
"res_model": "res.partner",
"res_field": "name",
}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
initial_filename = f"test-{attachment.id}-0.txt"
self.assertEqual(attachment.fs_storage_code, self.temp_backend.code)
self.assertEqual(os.listdir(self.temp_dir), [initial_filename])
with attachment.open("rb") as f:
self.assertEqual(f.read(), content)
with open(os.path.join(self.temp_dir, initial_filename), "rb") as f:
self.assertEqual(f.read(), content)
# 1b. Update the attachment
new_content = b"Update the test attachment"
attachment.raw = new_content
with attachment.open("rb") as f:
self.assertEqual(f.read(), new_content)
# a new file version is created
new_filename = f"test-{attachment.id}-1.txt"
with open(os.path.join(self.temp_dir, new_filename), "rb") as f:
self.assertEqual(f.read(), new_content)
self.assertEqual(attachment.raw, new_content)
self.assertEqual(attachment.store_fname, f"tmp_dir://{new_filename}")
# 2. Second attachment linked to res.partner but other field (website)
content = b"This is a test attachment linked to res.partner website field"
attachment = self.ir_attachment_model.create(
{
"name": "test.txt",
"raw": content,
"res_model": "res.partner",
"res_field": "website",
}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
self.assertEqual(attachment.fs_storage_code, self.default_backend.code)
# 3. Third attachment linked to res.partner but no specific field
content = b"This is a test attachment linked to res.partner model"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content, "res_model": "res.partner"}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
self.assertEqual(attachment.fs_storage_code, self.default_backend.code)
# Fourth attachment linked to res.country: no storage because
# no default FS storage
content = b"This is a test attachment linked to res.country model"
attachment = self.ir_attachment_model.create(
{"name": "test.txt", "raw": content, "res_model": "res.country"}
)
self.assertTrue(attachment.store_fname)
self.assertFalse(attachment.db_datas)
self.assertEqual(attachment.raw, content)
self.assertEqual(attachment.mimetype, "text/plain")
self.env.flush_all()
self.assertFalse(attachment.fs_storage_code)
def test_recompute_urls(self):
"""
Mark temp_backend as default and set its base_url.
Create one attachment in temp_backend that is linked to a field and one that is not.
* Check that after updating the base_url for the backend, executing recompute_urls
updates fs_url for both attachments, whether they are linked to a field or not
"""
self.temp_backend.base_url = "https://acsone.eu/media"
self.temp_backend.use_as_default_for_attachments = True
self.ir_attachment_model.create(
{
"name": "field.txt",
"raw": "Attachment linked to a field",
"res_model": "res.partner",
"res_field": "name",
}
)
self.ir_attachment_model.create(
{
"name": "no_field.txt",
"raw": "Attachment not linked to a field",
}
)
self.env.flush_all()
self.env.cr.execute(
f"""
SELECT COUNT(*)
FROM ir_attachment
WHERE fs_storage_id = {self.temp_backend.id}
AND fs_url LIKE '{self.temp_backend.base_url}%'
"""
)
self.assertEqual(self.env.cr.dictfetchall()[0].get("count"), 2)
self.temp_backend.base_url = "https://forgeflow.com/media"
self.temp_backend.recompute_urls()
self.env.flush_all()
self.env.cr.execute(
f"""
SELECT COUNT(*)
FROM ir_attachment
WHERE fs_storage_id = {self.temp_backend.id}
AND fs_url LIKE '{self.temp_backend.base_url}%'
"""
)
self.assertEqual(self.env.cr.dictfetchall()[0].get("count"), 2)
def test_url_for_image_dir_optimized_and_not_obfuscated(self):
# Create a base64 encoded mock image (1x1 pixel transparent PNG)
image_data = base64.b64encode(
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08"
b"\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\nIDAT\x08\xd7c\xf8\x0f\x00"
b"\x01\x01\x01\x00\xd1\x8d\xcd\xbf\x00\x00\x00\x00IEND\xaeB`\x82"
)
# Create a mock image filestore
fs_storage = self.env["fs.storage"].create(
{
"name": "FS Product Image Backend",
"code": "file",
"base_url": "https://localhost/images",
"optimizes_directory_path": True,
"use_filename_obfuscation": False,
}
)
# Create a mock image attachment
attachment = self.env["ir.attachment"].create(
{"name": "test_image.png", "datas": image_data, "mimetype": "image/png"}
)
# Get the url from the model
fs_url_1 = fs_storage._get_url_for_attachment(attachment)
# Generate the url that should be accessed
base_url = fs_storage.base_url_for_files
fs_filename = attachment.fs_filename
checksum = attachment.checksum
parts = [base_url, checksum[:2], checksum[2:4], fs_filename]
fs_url_2 = fs_storage._normalize_url("/".join(parts))
# Make some checks and asset if the two urls are equal
self.assertTrue(parts)
self.assertTrue(checksum)
self.assertEqual(fs_url_1, fs_url_2)

View file

@ -0,0 +1,238 @@
# Copyright 2023 ACSONE SA/NV (http://acsone.eu).
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html).
import base64
import io
import os
import shutil
import tempfile
from PIL import Image
from odoo.tests.common import HttpCase
class TestStream(HttpCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.env = cls.env(context=dict(cls.env.context, tracking_disable=True))
temp_dir = tempfile.mkdtemp()
cls.temp_backend = cls.env["fs.storage"].create(
{
"name": "Temp FS Storage",
"protocol": "file",
"code": "tmp_dir",
"directory_path": temp_dir,
"base_url": "http://my.public.files/",
}
)
cls.temp_dir = temp_dir
cls.content = b"This is a test attachment"
cls.attachment_binary = (
cls.env["ir.attachment"]
.with_context(
storage_location=cls.temp_backend.code,
storage_file_path="test.txt",
)
.create({"name": "test.txt", "raw": cls.content})
)
cls.image = cls._create_image(128, 128)
cls.attachment_image = (
cls.env["ir.attachment"]
.with_context(
storage_location=cls.temp_backend.code,
storage_file_path="test.png",
)
.create({"name": "test.png", "raw": cls.image})
)
@cls.addClassCleanup
def cleanup_tempdir():
shutil.rmtree(temp_dir)
assert cls.attachment_binary.fs_filename
assert cls.attachment_image.fs_filename
def setUp(self):
super().setUp()
# enforce temp_backend field since it seems that they are reset on
# savepoint rollback when managed by server_environment -> TO Be investigated
self.temp_backend.write(
{
"protocol": "file",
"code": "tmp_dir",
"directory_path": self.temp_dir,
"base_url": "http://my.public.files/",
}
)
@classmethod
def tearDownClass(cls):
super().tearDownClass()
for f in os.listdir(cls.temp_dir):
os.remove(os.path.join(cls.temp_dir, f))
@classmethod
def _create_image(cls, width, height, color="#4169E1", img_format="PNG"):
f = io.BytesIO()
Image.new("RGB", (width, height), color).save(f, img_format)
f.seek(0)
return f.read()
def assertDownload(
self, url, headers, assert_status_code, assert_headers, assert_content=None
):
res = self.url_open(url, headers=headers)
res.raise_for_status()
self.assertEqual(res.status_code, assert_status_code)
for header_name, header_value in assert_headers.items():
self.assertEqual(
res.headers.get(header_name),
header_value,
f"Wrong value for header {header_name}",
)
if assert_content:
self.assertEqual(res.content, assert_content, "Wong content")
return res
def test_content_url(self):
self.authenticate("admin", "admin")
url = f"/web/content/{self.attachment_binary.id}"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "text/plain; charset=utf-8",
"Content-Disposition": "inline; filename=test.txt",
},
assert_content=self.content,
)
url = f"/web/content/{self.attachment_binary.id}/?filename=test2.txt&mimetype=text/csv"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "text/csv; charset=utf-8",
"Content-Disposition": "inline; filename=test2.txt",
},
assert_content=self.content,
)
def test_image_url(self):
self.authenticate("admin", "admin")
url = f"/web/image/{self.attachment_image.id}"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
"Content-Disposition": "inline; filename=test.png",
},
assert_content=self.image,
)
def test_image_url_with_size(self):
self.authenticate("admin", "admin")
url = f"/web/image/{self.attachment_image.id}?width=64&height=64"
res = self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
"Content-Disposition": "inline; filename=test.png",
},
)
self.assertEqual(Image.open(io.BytesIO(res.content)).size, (64, 64))
def test_response_csp_header(self):
self.authenticate("admin", "admin")
url = f"/web/content/{self.attachment_binary.id}"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"X-Content-Type-Options": "nosniff",
"Content-Security-Policy": "default-src 'none'",
},
)
def test_serving_field_image(self):
self.authenticate("admin", "admin")
demo_partner = self.env.ref("base.partner_demo")
demo_partner.with_context(
storage_location=self.temp_backend.code,
).write({"image_128": base64.encodebytes(self._create_image(128, 128))})
url = f"/web/image/{demo_partner._name}/{demo_partner.id}/image_128"
res = self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
},
)
self.assertEqual(Image.open(io.BytesIO(res.content)).size, (128, 128))
url = f"/web/image/{demo_partner._name}/{demo_partner.id}/avatar_128"
avatar_res = self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
},
)
self.assertEqual(Image.open(io.BytesIO(avatar_res.content)).size, (128, 128))
def test_image_url_name_with_newline(self):
"""Test downloading a file with a newline in the name.
This test simulates the scenario that causes the Werkzeug error:
`ValueError: Detected newline in header value`.
It verifies that:
1. An `ir.attachment` record is created with a newline character
(`\n`) explicitly included in its `name` field ("tes\nt.png").
2. Accessing this attachment via the `/web/image/{attachment_id}` URL
succeeds with an HTTP 200 status code.
3. Crucially, the `Content-Disposition` header returned in the response
contains a *sanitized* filename ("tes_t.png"). The newline character
has been replaced (typically with an underscore by `secure_filename`).
This confirms that the filename sanitization implemented (likely in the
streaming logic, e.g., `FsStream.get_response` using `secure_filename`)
correctly processes the unsafe filename before passing it to Werkzeug,
thus preventing the original `ValueError` and ensuring safe header values.
"""
attachment_image = (
self.env["ir.attachment"]
.with_context(
storage_location=self.temp_backend.code,
storage_file_path="test.png",
)
.create(
{"name": "tes\nt.png", "raw": self.image}
) # newline in the filename
)
# Ensure the name IS stored with the newline before sanitization happens on download
self.assertIn("\n", attachment_image.name)
self.authenticate("admin", "admin")
url = f"/web/image/{attachment_image.id}"
self.assertDownload(
url,
headers={},
assert_status_code=200,
assert_headers={
"Content-Type": "image/png",
# Assert that the filename in the header IS sanitized
"Content-Disposition": "inline; filename=tes_t.png",
},
assert_content=self.image,
)