/* For pread()/pwrite()/utimensat() */
#define _XOPEN_SOURCE 700
-/* For get_current_dir_name */
-#define _GNU_SOURCE
-
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stddef.h>
+#include <assert.h>
+
+#ifdef __APPLE__
+ #include <osxfuse/fuse.h>
+#else
+ #include <fuse.h>
+#endif
-#include <fuse.h>
#include <sys/xattr.h>
+#include <sys/param.h>
#include "fuse_xattrs_config.h"
return -ENOENT;
}
- if (get_namespace(name) != USER) {
+ if (xattrs_config.enable_namespaces == 1 && get_namespace(name) != USER) {
debug_print("Only user namespace is supported. name=%s\n", name);
return -ENOTSUP;
}
return -ENOENT;
}
- if (get_namespace(name) != USER) {
+ if (xattrs_config.enable_namespaces == 1 && get_namespace(name) != USER) {
debug_print("Only user namespace is supported. name=%s\n", name);
return -ENOTSUP;
}
return rtval;
}
+#ifdef __APPLE__
+ static int xmp_setxattr_apple(const char *path, const char *name, const char *value, size_t size, int flags, u_int32_t position) {
+ assert(position == 0);
+ return xmp_setxattr(path, name, value, size, flags);
+ }
+
+ static int xmp_getxattr_apple(const char *path, const char *name, char *value, size_t size, u_int32_t position) {
+ assert(position == 0);
+ return xmp_getxattr(path, name, value, size);
+ }
+#endif
+
static int xmp_listxattr(const char *path, char *list, size_t size)
{
if (xattrs_config.show_sidecar == 0 && filename_is_sidecar(path) == 1) {
return -ENOENT;
}
- if (get_namespace(name) != USER) {
+ if (xattrs_config.enable_namespaces == 1 && get_namespace(name) != USER) {
debug_print("Only user namespace is supported. name=%s\n", name);
return -ENOTSUP;
}
#ifdef HAVE_POSIX_FALLOCATE
.fallocate = xmp_fallocate,
#endif
+#ifdef __APPLE__
+ .setxattr = xmp_setxattr_apple,
+ .getxattr = xmp_getxattr_apple,
+#else
.setxattr = xmp_setxattr,
.getxattr = xmp_getxattr,
+#endif
.listxattr = xmp_listxattr,
.removexattr = xmp_removexattr,
};
return absolute_path;
}
- char *pwd = get_current_dir_name();
+ static char cwd[MAXPATHLEN];
+ char *pwd = getcwd(cwd, sizeof(cwd));
size_t len = strlen(pwd) + 1 + strlen(path) + 1;
int has_trailing_backslash = (path[strlen(path)-1] == '/');
if (!has_trailing_backslash)
#define FUSE_XATTRS_OPT(t, p, v) { t, offsetof(struct xattrs_config, p), v }
static struct fuse_opt xattrs_opts[] = {
- FUSE_XATTRS_OPT("show_sidecar", show_sidecar, 1),
+ FUSE_XATTRS_OPT("show_sidecar", show_sidecar, 1),
+ FUSE_XATTRS_OPT("enable_namespaces", enable_namespaces, 1),
FUSE_OPT_KEY("-V", KEY_VERSION),
FUSE_OPT_KEY("--version", KEY_VERSION),
"\n"
"FUSE XATTRS options:\n"
" -o show_sidecar don't hide sidecar files\n"
+ " -o enable_namespaces enable namespaces checks\n"
"\n", outargs->argv[0]);
fuse_opt_add_arg(outargs, "-ho");
import xattr
from pathlib import Path
import os
+import errno
+import sys
if xattr.__version__ != '0.9.1':
print("WARNING, only tested with xattr version 0.9.1")
+# Linux / BSD Compatibility
+ENOATTR = errno.ENODATA
+if hasattr(errno, "ENOATTR"):
+ ENOATTR = errno.ENOATTR
+
# TODO
# - listxattr: list too long
# - sidecar file permissions
# - corrupt metadata files
+skipNamespaceTests = False
+
-class TestXAttrs(unittest.TestCase):
+class TestXAttrBase(unittest.TestCase):
def setUp(self):
self.sourceDir = "./source/"
self.mountDir = "./mount/"
if os.path.isfile(self.randomFileSidecar):
os.remove(self.randomFileSidecar)
- def test_xattr_set(self):
- xattr.setxattr(self.randomFile, "user.foo", bytes("bar", "utf-8"))
-
- def test_xattr_set_name_max_length(self):
- enc = "utf-8"
- key = "user." + "x" * 250
- value = "x"
- self.assertEqual(len(key), 255)
- xattr.setxattr(self.randomFile, key, bytes(value, enc))
-
- key = "user." + "x" * 251
- self.assertEqual(len(key), 256)
- with self.assertRaises(OSError) as ex:
- xattr.setxattr(self.randomFile, key, bytes(value, enc))
- self.assertEqual(ex.exception.errno, 34)
- self.assertEqual(ex.exception.strerror, "Numerical result out of range")
-
- @unittest.expectedFailure
- def test_xattr_set_value_max_size(self):
- enc = "utf-8"
- key = "user.foo"
- value = "x" * (64 * 1024 + 1) # we want max 64KiB of data
- with self.assertRaises(OSError) as ex:
- xattr.setxattr(self.randomFile, key, bytes(value, enc))
-
- # on btrfs we get "no space left on device"
- self.assertEqual(ex.exception.errno, 28)
- self.assertEqual(ex.exception.strerror, "No space left on device")
- # on fuse_xattr we get "Argument list too long"
- # the error is thrown by fuse, not by fuse_xattr code
+##########################################################################################
+# The following tests are generic for any FS that supports extended attributes
+class TestGenericXAttrs(TestXAttrBase):
- def test_xattr_set_namespaces(self):
- with self.assertRaises(OSError) as ex:
- xattr.setxattr(self.randomFile, "system.foo", bytes("bar", "utf-8"))
- self.assertEqual(ex.exception.errno, 95)
- self.assertEqual(ex.exception.strerror, "Operation not supported")
-
- with self.assertRaises(OSError) as ex:
- xattr.setxattr(self.randomFile, "trust.foo", bytes("bar", "utf-8"))
- self.assertEqual(ex.exception.errno, 95)
- self.assertEqual(ex.exception.strerror, "Operation not supported")
-
- with self.assertRaises(OSError) as ex:
- xattr.setxattr(self.randomFile, "foo.foo", bytes("bar", "utf-8"))
- self.assertEqual(ex.exception.errno, 95)
- self.assertEqual(ex.exception.strerror, "Operation not supported")
-
- with self.assertRaises(PermissionError) as ex:
- xattr.setxattr(self.randomFile, "security.foo", bytes("bar", "utf-8"))
- self.assertEqual(ex.exception.errno, 1)
- self.assertEqual(ex.exception.strerror, "Operation not permitted")
+ def test_xattr_set(self):
+ xattr.setxattr(self.randomFile, "user.foo", bytes("bar", "utf-8"))
def test_xattr_get_non_existent(self):
key = "user.foo"
with self.assertRaises(OSError) as ex:
xattr.getxattr(self.randomFile, key)
- self.assertEqual(ex.exception.errno, 61)
- self.assertEqual(ex.exception.strerror, "No data available")
+ self.assertEqual(ex.exception.errno, ENOATTR)
def test_xattr_get(self):
enc = "utf-8"
read_value = xattr.getxattr(self.randomFile, key)
self.assertEqual(value, read_value.decode(enc))
+ # FIXME: On OSX fuse is replacing the return value 0 with -1
def test_xattr_set_empty(self):
enc = "utf-8"
key = "user.foo"
value = "bar"
with self.assertRaises(OSError) as ex:
xattr.setxattr(self.randomFile, key, bytes(value, enc), xattr.XATTR_REPLACE)
- self.assertEqual(ex.exception.errno, 61)
- self.assertEqual(ex.exception.strerror, "No data available")
+ self.assertEqual(ex.exception.errno, ENOATTR)
with self.assertRaises(OSError) as ex:
xattr.getxattr(self.randomFile, key)
- self.assertEqual(ex.exception.errno, 61)
- self.assertEqual(ex.exception.strerror, "No data available")
+ self.assertEqual(ex.exception.errno, ENOATTR)
def test_xattr_list_empty(self):
attrs = xattr.listxattr(self.randomFile)
# should fail when trying to read it
with self.assertRaises(OSError) as ex:
xattr.getxattr(self.randomFile, key)
- self.assertEqual(ex.exception.errno, 61)
- self.assertEqual(ex.exception.strerror, "No data available")
+ self.assertEqual(ex.exception.errno, ENOATTR)
# removing twice should fail
with self.assertRaises(OSError) as ex:
xattr.getxattr(self.randomFile, key)
- self.assertEqual(ex.exception.errno, 61)
- self.assertEqual(ex.exception.strerror, "No data available")
+ self.assertEqual(ex.exception.errno, ENOATTR)
+
+ def test_xattr_set_name_max_length(self):
+ max_len = 255
+ if sys.platform != 'linux':
+ max_len = 127 # OSX VFS only support names up to 127 bytes
+
+ enc = "utf-8"
+ key = "user." + "x" * (max_len - 5)
+ value = "x"
+ self.assertEqual(len(key), max_len)
+ xattr.setxattr(self.randomFile, key, bytes(value, enc)) # NOTE: WILL FAIL IF RUNNING ON HFS+
+
+ key = "user." + "x" * (max_len - 5 + 1)
+ self.assertEqual(len(key), max_len + 1)
+ with self.assertRaises(OSError) as ex:
+ xattr.setxattr(self.randomFile, key, bytes(value, enc))
+
+ if sys.platform == 'linux':
+ self.assertEqual(ex.exception.errno, errno.ERANGE) # This is the behavior of BTRFS
+ else:
+ self.assertEqual(ex.exception.errno, errno.ENAMETOOLONG)
+
+ #########################################################
+ # On Linux: The test fails as the max value is detected before reaching the filesystem
+ # On OSX: Works on fuse_xattr but fails on HFS+ (doesn't has limit)
+ @unittest.skipIf(sys.platform == "linux", "Skipping test on Linux")
+ def test_xattr_set_value_max_size(self):
+ enc = "utf-8"
+ key = "user.foo"
+ value = "x" * (64 * 1024 + 1) # we want max 64KiB of data
+ with self.assertRaises(OSError) as ex:
+ xattr.setxattr(self.randomFile, key, bytes(value, enc)) # NOTE: This test fail on HFS+ (doesn't have limit)
+
+ self.assertEqual(ex.exception.errno, errno.ENOSPC)
+
+ # on fuse_xattr we get "Argument list too long"
+ # the error is thrown by fuse, not by fuse_xattr code
+
+ #########################################################
+ # Those tests are going to pass on Linux but fail on BSD
+ # BSD doesn't support namespaces.
+ @unittest.skipIf(skipNamespaceTests, "Namespace tests disabled")
+ def test_xattr_set_namespaces(self):
+ with self.assertRaises(OSError) as ex:
+ xattr.setxattr(self.randomFile, "system.foo", bytes("bar", "utf-8"))
+ self.assertEqual(ex.exception.errno, 95)
+ self.assertEqual(ex.exception.strerror, "Operation not supported")
+
+ with self.assertRaises(OSError) as ex:
+ xattr.setxattr(self.randomFile, "trust.foo", bytes("bar", "utf-8"))
+ self.assertEqual(ex.exception.errno, 95)
+ self.assertEqual(ex.exception.strerror, "Operation not supported")
+
+ with self.assertRaises(OSError) as ex:
+ xattr.setxattr(self.randomFile, "foo.foo", bytes("bar", "utf-8"))
+ self.assertEqual(ex.exception.errno, 95)
+ self.assertEqual(ex.exception.strerror, "Operation not supported")
+
+ with self.assertRaises(PermissionError) as ex:
+ xattr.setxattr(self.randomFile, "security.foo", bytes("bar", "utf-8"))
+ self.assertEqual(ex.exception.errno, 1)
+ self.assertEqual(ex.exception.strerror, "Operation not permitted")
+
+
+######################################################################
+# The following tests should fail if they are running on a normal FS
+class TestFuseXATTR(TestXAttrBase):
def test_hide_sidecar(self):
xattr.setxattr(self.randomFile, "user.foo", bytes("bar", "utf-8"))
self.assertTrue(self.randomFilename in files_source)
self.assertTrue(sidecarFilename in files_source)
- def test_create_new_file(self):
+ def test_fuse_xattr_create_new_file(self):
test_filename = "test_create_new_file"
self.assertFalse(os.path.isfile(self.sourceDir + test_filename))
self.assertFalse(os.path.isfile(self.mountDir + test_filename))
# FIXME: if one assert fails, the file isn't going to be deleted
os.remove(self.mountDir + test_filename)
- def test_remove_file_with_sidecar(self):
+ def test_fuse_xattr_remove_file_with_sidecar(self):
xattr.setxattr(self.randomFile, "user.foo", bytes("bar", "utf-8"))
self.assertTrue(os.path.isfile(self.randomFile))
self.assertTrue(os.path.isfile(self.randomSourceFile))
self.assertFalse(os.path.isfile(self.randomSourceFile))
self.assertFalse(os.path.isfile(self.randomSourceFileSidecar))
- def test_remove_file_without_sidecar(self):
+ def test_fuse_xattr_remove_file_without_sidecar(self):
self.assertTrue(os.path.isfile(self.randomFile))
self.assertTrue(os.path.isfile(self.randomSourceFile))
self.assertFalse(os.path.isfile(self.randomSourceFileSidecar))