1# SPDX-License-Identifier:      GPL-2.0+
2# Copyright (c) 2018, Linaro Limited
3# Author: Takahiro Akashi <takahiro.akashi@linaro.org>
4
5import os
6import os.path
7import pytest
8import re
9from subprocess import call, check_call, check_output, CalledProcessError
10from fstest_defs import *
11import u_boot_utils as util
12
13supported_fs_basic = ['fat16', 'fat32', 'ext4']
14supported_fs_ext = ['fat16', 'fat32']
15supported_fs_mkdir = ['fat16', 'fat32']
16supported_fs_unlink = ['fat16', 'fat32']
17supported_fs_symlink = ['ext4']
18
19#
20# Filesystem test specific setup
21#
22def pytest_addoption(parser):
23    """Enable --fs-type option.
24
25    See pytest_configure() about how it works.
26
27    Args:
28        parser: Pytest command-line parser.
29
30    Returns:
31        Nothing.
32    """
33    parser.addoption('--fs-type', action='append', default=None,
34        help='Targeting Filesystem Types')
35
36def pytest_configure(config):
37    """Restrict a file system(s) to be tested.
38
39    A file system explicitly named with --fs-type option is selected
40    if it belongs to a default supported_fs_xxx list.
41    Multiple options can be specified.
42
43    Args:
44        config: Pytest configuration.
45
46    Returns:
47        Nothing.
48    """
49    global supported_fs_basic
50    global supported_fs_ext
51    global supported_fs_mkdir
52    global supported_fs_unlink
53    global supported_fs_symlink
54
55    def intersect(listA, listB):
56        return  [x for x in listA if x in listB]
57
58    supported_fs = config.getoption('fs_type')
59    if supported_fs:
60        print('*** FS TYPE modified: %s' % supported_fs)
61        supported_fs_basic =  intersect(supported_fs, supported_fs_basic)
62        supported_fs_ext =  intersect(supported_fs, supported_fs_ext)
63        supported_fs_mkdir =  intersect(supported_fs, supported_fs_mkdir)
64        supported_fs_unlink =  intersect(supported_fs, supported_fs_unlink)
65        supported_fs_symlink =  intersect(supported_fs, supported_fs_symlink)
66
67def pytest_generate_tests(metafunc):
68    """Parametrize fixtures, fs_obj_xxx
69
70    Each fixture will be parametrized with a corresponding support_fs_xxx
71    list.
72
73    Args:
74        metafunc: Pytest test function.
75
76    Returns:
77        Nothing.
78    """
79    if 'fs_obj_basic' in metafunc.fixturenames:
80        metafunc.parametrize('fs_obj_basic', supported_fs_basic,
81            indirect=True, scope='module')
82    if 'fs_obj_ext' in metafunc.fixturenames:
83        metafunc.parametrize('fs_obj_ext', supported_fs_ext,
84            indirect=True, scope='module')
85    if 'fs_obj_mkdir' in metafunc.fixturenames:
86        metafunc.parametrize('fs_obj_mkdir', supported_fs_mkdir,
87            indirect=True, scope='module')
88    if 'fs_obj_unlink' in metafunc.fixturenames:
89        metafunc.parametrize('fs_obj_unlink', supported_fs_unlink,
90            indirect=True, scope='module')
91    if 'fs_obj_symlink' in metafunc.fixturenames:
92        metafunc.parametrize('fs_obj_symlink', supported_fs_symlink,
93            indirect=True, scope='module')
94
95#
96# Helper functions
97#
98def fstype_to_ubname(fs_type):
99    """Convert a file system type to an U-boot specific string
100
101    A generated string can be used as part of file system related commands
102    or a config name in u-boot. Currently fat16 and fat32 are handled
103    specifically.
104
105    Args:
106        fs_type: File system type.
107
108    Return:
109        A corresponding string for file system type.
110    """
111    if re.match('fat', fs_type):
112        return 'fat'
113    else:
114        return fs_type
115
116def check_ubconfig(config, fs_type):
117    """Check whether a file system is enabled in u-boot configuration.
118
119    This function is assumed to be called in a fixture function so that
120    the whole test cases will be skipped if a given file system is not
121    enabled.
122
123    Args:
124        fs_type: File system type.
125
126    Return:
127        Nothing.
128    """
129    if not config.buildconfig.get('config_cmd_%s' % fs_type, None):
130        pytest.skip('.config feature "CMD_%s" not enabled' % fs_type.upper())
131    if not config.buildconfig.get('config_%s_write' % fs_type, None):
132        pytest.skip('.config feature "%s_WRITE" not enabled'
133        % fs_type.upper())
134
135def mk_fs(config, fs_type, size, id):
136    """Create a file system volume.
137
138    Args:
139        fs_type: File system type.
140        size: Size of file system in MiB.
141        id: Prefix string of volume's file name.
142
143    Return:
144        Nothing.
145    """
146    fs_img = '%s.%s.img' % (id, fs_type)
147    fs_img = config.persistent_data_dir + '/' + fs_img
148
149    if fs_type == 'fat16':
150        mkfs_opt = '-F 16'
151    elif fs_type == 'fat32':
152        mkfs_opt = '-F 32'
153    else:
154        mkfs_opt = ''
155
156    if re.match('fat', fs_type):
157        fs_lnxtype = 'vfat'
158    else:
159        fs_lnxtype = fs_type
160
161    count = (size + 1048576 - 1) / 1048576
162
163    # Some distributions do not add /sbin to the default PATH, where mkfs lives
164    if '/sbin' not in os.environ["PATH"].split(os.pathsep):
165        os.environ["PATH"] += os.pathsep + '/sbin'
166
167    try:
168        check_call('rm -f %s' % fs_img, shell=True)
169        check_call('dd if=/dev/zero of=%s bs=1M count=%d'
170            % (fs_img, count), shell=True)
171        check_call('mkfs.%s %s %s'
172            % (fs_lnxtype, mkfs_opt, fs_img), shell=True)
173        if fs_type == 'ext4':
174            sb_content = check_output('tune2fs -l %s' % fs_img, shell=True).decode()
175            if 'metadata_csum' in sb_content:
176                check_call('tune2fs -O ^metadata_csum %s' % fs_img, shell=True)
177        return fs_img
178    except CalledProcessError:
179        call('rm -f %s' % fs_img, shell=True)
180        raise
181
182# from test/py/conftest.py
183def tool_is_in_path(tool):
184    """Check whether a given command is available on host.
185
186    Args:
187        tool: Command name.
188
189    Return:
190        True if available, False if not.
191    """
192    for path in os.environ['PATH'].split(os.pathsep):
193        fn = os.path.join(path, tool)
194        if os.path.isfile(fn) and os.access(fn, os.X_OK):
195            return True
196    return False
197
198fuse_mounted = False
199
200def mount_fs(fs_type, device, mount_point):
201    """Mount a volume.
202
203    Args:
204        fs_type: File system type.
205        device: Volume's file name.
206        mount_point: Mount point.
207
208    Return:
209        Nothing.
210    """
211    global fuse_mounted
212
213    try:
214        check_call('guestmount --pid-file guestmount.pid -a %s -m /dev/sda %s'
215            % (device, mount_point), shell=True)
216        fuse_mounted = True
217        return
218    except CalledProcessError:
219        fuse_mounted = False
220
221    mount_opt = 'loop,rw'
222    if re.match('fat', fs_type):
223        mount_opt += ',umask=0000'
224
225    check_call('sudo mount -o %s %s %s'
226        % (mount_opt, device, mount_point), shell=True)
227
228    # may not be effective for some file systems
229    check_call('sudo chmod a+rw %s' % mount_point, shell=True)
230
231def umount_fs(mount_point):
232    """Unmount a volume.
233
234    Args:
235        mount_point: Mount point.
236
237    Return:
238        Nothing.
239    """
240    if fuse_mounted:
241        call('sync')
242        call('guestunmount %s' % mount_point, shell=True)
243
244        try:
245            with open("guestmount.pid", "r") as pidfile:
246                pid = int(pidfile.read())
247            util.waitpid(pid, kill=True)
248            os.remove("guestmount.pid")
249
250        except FileNotFoundError:
251            pass
252
253    else:
254        call('sudo umount %s' % mount_point, shell=True)
255
256#
257# Fixture for basic fs test
258#     derived from test/fs/fs-test.sh
259#
260@pytest.fixture()
261def fs_obj_basic(request, u_boot_config):
262    """Set up a file system to be used in basic fs test.
263
264    Args:
265        request: Pytest request object.
266	u_boot_config: U-boot configuration.
267
268    Return:
269        A fixture for basic fs test, i.e. a triplet of file system type,
270        volume file name and  a list of MD5 hashes.
271    """
272    fs_type = request.param
273    fs_img = ''
274
275    fs_ubtype = fstype_to_ubname(fs_type)
276    check_ubconfig(u_boot_config, fs_ubtype)
277
278    mount_dir = u_boot_config.persistent_data_dir + '/mnt'
279
280    small_file = mount_dir + '/' + SMALL_FILE
281    big_file = mount_dir + '/' + BIG_FILE
282
283    try:
284
285        # 3GiB volume
286        fs_img = mk_fs(u_boot_config, fs_type, 0xc0000000, '3GB')
287    except CalledProcessError as err:
288        pytest.skip('Creating failed for filesystem: ' + fs_type + '. {}'.format(err))
289        return
290
291    try:
292        check_call('mkdir -p %s' % mount_dir, shell=True)
293    except CalledProcessError as err:
294        pytest.skip('Preparing mount folder failed for filesystem: ' + fs_type + '. {}'.format(err))
295        call('rm -f %s' % fs_img, shell=True)
296        return
297
298    try:
299        # Mount the image so we can populate it.
300        mount_fs(fs_type, fs_img, mount_dir)
301    except CalledProcessError as err:
302        pytest.skip('Mounting to folder failed for filesystem: ' + fs_type + '. {}'.format(err))
303        call('rmdir %s' % mount_dir, shell=True)
304        call('rm -f %s' % fs_img, shell=True)
305        return
306
307    try:
308        # Create a subdirectory.
309        check_call('mkdir %s/SUBDIR' % mount_dir, shell=True)
310
311        # Create big file in this image.
312        # Note that we work only on the start 1MB, couple MBs in the 2GB range
313        # and the last 1 MB of the huge 2.5GB file.
314        # So, just put random values only in those areas.
315        check_call('dd if=/dev/urandom of=%s bs=1M count=1'
316	    % big_file, shell=True)
317        check_call('dd if=/dev/urandom of=%s bs=1M count=2 seek=2047'
318            % big_file, shell=True)
319        check_call('dd if=/dev/urandom of=%s bs=1M count=1 seek=2499'
320            % big_file, shell=True)
321
322        # Create a small file in this image.
323        check_call('dd if=/dev/urandom of=%s bs=1M count=1'
324	    % small_file, shell=True)
325
326        # Delete the small file copies which possibly are written as part of a
327        # previous test.
328        # check_call('rm -f "%s.w"' % MB1, shell=True)
329        # check_call('rm -f "%s.w2"' % MB1, shell=True)
330
331        # Generate the md5sums of reads that we will test against small file
332        out = check_output(
333            'dd if=%s bs=1M skip=0 count=1 2> /dev/null | md5sum'
334	    % small_file, shell=True).decode()
335        md5val = [ out.split()[0] ]
336
337        # Generate the md5sums of reads that we will test against big file
338        # One from beginning of file.
339        out = check_output(
340            'dd if=%s bs=1M skip=0 count=1 2> /dev/null | md5sum'
341	    % big_file, shell=True).decode()
342        md5val.append(out.split()[0])
343
344        # One from end of file.
345        out = check_output(
346            'dd if=%s bs=1M skip=2499 count=1 2> /dev/null | md5sum'
347	    % big_file, shell=True).decode()
348        md5val.append(out.split()[0])
349
350        # One from the last 1MB chunk of 2GB
351        out = check_output(
352            'dd if=%s bs=1M skip=2047 count=1 2> /dev/null | md5sum'
353	    % big_file, shell=True).decode()
354        md5val.append(out.split()[0])
355
356        # One from the start 1MB chunk from 2GB
357        out = check_output(
358            'dd if=%s bs=1M skip=2048 count=1 2> /dev/null | md5sum'
359	    % big_file, shell=True).decode()
360        md5val.append(out.split()[0])
361
362        # One 1MB chunk crossing the 2GB boundary
363        out = check_output(
364            'dd if=%s bs=512K skip=4095 count=2 2> /dev/null | md5sum'
365	    % big_file, shell=True).decode()
366        md5val.append(out.split()[0])
367
368    except CalledProcessError as err:
369        pytest.skip('Setup failed for filesystem: ' + fs_type + '. {}'.format(err))
370        umount_fs(mount_dir)
371        return
372    else:
373        umount_fs(mount_dir)
374        yield [fs_ubtype, fs_img, md5val]
375    finally:
376        call('rmdir %s' % mount_dir, shell=True)
377        call('rm -f %s' % fs_img, shell=True)
378
379#
380# Fixture for extended fs test
381#
382@pytest.fixture()
383def fs_obj_ext(request, u_boot_config):
384    """Set up a file system to be used in extended fs test.
385
386    Args:
387        request: Pytest request object.
388	u_boot_config: U-boot configuration.
389
390    Return:
391        A fixture for extended fs test, i.e. a triplet of file system type,
392        volume file name and  a list of MD5 hashes.
393    """
394    fs_type = request.param
395    fs_img = ''
396
397    fs_ubtype = fstype_to_ubname(fs_type)
398    check_ubconfig(u_boot_config, fs_ubtype)
399
400    mount_dir = u_boot_config.persistent_data_dir + '/mnt'
401
402    min_file = mount_dir + '/' + MIN_FILE
403    tmp_file = mount_dir + '/tmpfile'
404
405    try:
406
407        # 128MiB volume
408        fs_img = mk_fs(u_boot_config, fs_type, 0x8000000, '128MB')
409    except CalledProcessError as err:
410        pytest.skip('Creating failed for filesystem: ' + fs_type + '. {}'.format(err))
411        return
412
413    try:
414        check_call('mkdir -p %s' % mount_dir, shell=True)
415    except CalledProcessError as err:
416        pytest.skip('Preparing mount folder failed for filesystem: ' + fs_type + '. {}'.format(err))
417        call('rm -f %s' % fs_img, shell=True)
418        return
419
420    try:
421        # Mount the image so we can populate it.
422        mount_fs(fs_type, fs_img, mount_dir)
423    except CalledProcessError as err:
424        pytest.skip('Mounting to folder failed for filesystem: ' + fs_type + '. {}'.format(err))
425        call('rmdir %s' % mount_dir, shell=True)
426        call('rm -f %s' % fs_img, shell=True)
427        return
428
429    try:
430        # Create a test directory
431        check_call('mkdir %s/dir1' % mount_dir, shell=True)
432
433        # Create a small file and calculate md5
434        check_call('dd if=/dev/urandom of=%s bs=1K count=20'
435            % min_file, shell=True)
436        out = check_output(
437            'dd if=%s bs=1K 2> /dev/null | md5sum'
438            % min_file, shell=True).decode()
439        md5val = [ out.split()[0] ]
440
441        # Calculate md5sum of Test Case 4
442        check_call('dd if=%s of=%s bs=1K count=20'
443            % (min_file, tmp_file), shell=True)
444        check_call('dd if=%s of=%s bs=1K seek=5 count=20'
445            % (min_file, tmp_file), shell=True)
446        out = check_output('dd if=%s bs=1K 2> /dev/null | md5sum'
447            % tmp_file, shell=True).decode()
448        md5val.append(out.split()[0])
449
450        # Calculate md5sum of Test Case 5
451        check_call('dd if=%s of=%s bs=1K count=20'
452            % (min_file, tmp_file), shell=True)
453        check_call('dd if=%s of=%s bs=1K seek=5 count=5'
454            % (min_file, tmp_file), shell=True)
455        out = check_output('dd if=%s bs=1K 2> /dev/null | md5sum'
456            % tmp_file, shell=True).decode()
457        md5val.append(out.split()[0])
458
459        # Calculate md5sum of Test Case 7
460        check_call('dd if=%s of=%s bs=1K count=20'
461            % (min_file, tmp_file), shell=True)
462        check_call('dd if=%s of=%s bs=1K seek=20 count=20'
463            % (min_file, tmp_file), shell=True)
464        out = check_output('dd if=%s bs=1K 2> /dev/null | md5sum'
465            % tmp_file, shell=True).decode()
466        md5val.append(out.split()[0])
467
468        check_call('rm %s' % tmp_file, shell=True)
469    except CalledProcessError:
470        pytest.skip('Setup failed for filesystem: ' + fs_type)
471        umount_fs(mount_dir)
472        return
473    else:
474        umount_fs(mount_dir)
475        yield [fs_ubtype, fs_img, md5val]
476    finally:
477        call('rmdir %s' % mount_dir, shell=True)
478        call('rm -f %s' % fs_img, shell=True)
479
480#
481# Fixture for mkdir test
482#
483@pytest.fixture()
484def fs_obj_mkdir(request, u_boot_config):
485    """Set up a file system to be used in mkdir test.
486
487    Args:
488        request: Pytest request object.
489	u_boot_config: U-boot configuration.
490
491    Return:
492        A fixture for mkdir test, i.e. a duplet of file system type and
493        volume file name.
494    """
495    fs_type = request.param
496    fs_img = ''
497
498    fs_ubtype = fstype_to_ubname(fs_type)
499    check_ubconfig(u_boot_config, fs_ubtype)
500
501    try:
502        # 128MiB volume
503        fs_img = mk_fs(u_boot_config, fs_type, 0x8000000, '128MB')
504    except:
505        pytest.skip('Setup failed for filesystem: ' + fs_type)
506        return
507    else:
508        yield [fs_ubtype, fs_img]
509    call('rm -f %s' % fs_img, shell=True)
510
511#
512# Fixture for unlink test
513#
514@pytest.fixture()
515def fs_obj_unlink(request, u_boot_config):
516    """Set up a file system to be used in unlink test.
517
518    Args:
519        request: Pytest request object.
520	u_boot_config: U-boot configuration.
521
522    Return:
523        A fixture for unlink test, i.e. a duplet of file system type and
524        volume file name.
525    """
526    fs_type = request.param
527    fs_img = ''
528
529    fs_ubtype = fstype_to_ubname(fs_type)
530    check_ubconfig(u_boot_config, fs_ubtype)
531
532    mount_dir = u_boot_config.persistent_data_dir + '/mnt'
533
534    try:
535
536        # 128MiB volume
537        fs_img = mk_fs(u_boot_config, fs_type, 0x8000000, '128MB')
538    except CalledProcessError as err:
539        pytest.skip('Creating failed for filesystem: ' + fs_type + '. {}'.format(err))
540        return
541
542    try:
543        check_call('mkdir -p %s' % mount_dir, shell=True)
544    except CalledProcessError as err:
545        pytest.skip('Preparing mount folder failed for filesystem: ' + fs_type + '. {}'.format(err))
546        call('rm -f %s' % fs_img, shell=True)
547        return
548
549    try:
550        # Mount the image so we can populate it.
551        mount_fs(fs_type, fs_img, mount_dir)
552    except CalledProcessError as err:
553        pytest.skip('Mounting to folder failed for filesystem: ' + fs_type + '. {}'.format(err))
554        call('rmdir %s' % mount_dir, shell=True)
555        call('rm -f %s' % fs_img, shell=True)
556        return
557
558    try:
559        # Test Case 1 & 3
560        check_call('mkdir %s/dir1' % mount_dir, shell=True)
561        check_call('dd if=/dev/urandom of=%s/dir1/file1 bs=1K count=1'
562                                    % mount_dir, shell=True)
563        check_call('dd if=/dev/urandom of=%s/dir1/file2 bs=1K count=1'
564                                    % mount_dir, shell=True)
565
566        # Test Case 2
567        check_call('mkdir %s/dir2' % mount_dir, shell=True)
568        for i in range(0, 20):
569            check_call('mkdir %s/dir2/0123456789abcdef%02x'
570                                    % (mount_dir, i), shell=True)
571
572        # Test Case 4
573        check_call('mkdir %s/dir4' % mount_dir, shell=True)
574
575        # Test Case 5, 6 & 7
576        check_call('mkdir %s/dir5' % mount_dir, shell=True)
577        check_call('dd if=/dev/urandom of=%s/dir5/file1 bs=1K count=1'
578                                    % mount_dir, shell=True)
579
580    except CalledProcessError:
581        pytest.skip('Setup failed for filesystem: ' + fs_type)
582        umount_fs(mount_dir)
583        return
584    else:
585        umount_fs(mount_dir)
586        yield [fs_ubtype, fs_img]
587    finally:
588        call('rmdir %s' % mount_dir, shell=True)
589        call('rm -f %s' % fs_img, shell=True)
590
591#
592# Fixture for symlink fs test
593#
594@pytest.fixture()
595def fs_obj_symlink(request, u_boot_config):
596    """Set up a file system to be used in symlink fs test.
597
598    Args:
599        request: Pytest request object.
600        u_boot_config: U-boot configuration.
601
602    Return:
603        A fixture for basic fs test, i.e. a triplet of file system type,
604        volume file name and  a list of MD5 hashes.
605    """
606    fs_type = request.param
607    fs_img = ''
608
609    fs_ubtype = fstype_to_ubname(fs_type)
610    check_ubconfig(u_boot_config, fs_ubtype)
611
612    mount_dir = u_boot_config.persistent_data_dir + '/mnt'
613
614    small_file = mount_dir + '/' + SMALL_FILE
615    medium_file = mount_dir + '/' + MEDIUM_FILE
616
617    try:
618
619        # 1GiB volume
620        fs_img = mk_fs(u_boot_config, fs_type, 0x40000000, '1GB')
621    except CalledProcessError as err:
622        pytest.skip('Creating failed for filesystem: ' + fs_type + '. {}'.format(err))
623        return
624
625    try:
626        check_call('mkdir -p %s' % mount_dir, shell=True)
627    except CalledProcessError as err:
628        pytest.skip('Preparing mount folder failed for filesystem: ' + fs_type + '. {}'.format(err))
629        call('rm -f %s' % fs_img, shell=True)
630        return
631
632    try:
633        # Mount the image so we can populate it.
634        mount_fs(fs_type, fs_img, mount_dir)
635    except CalledProcessError as err:
636        pytest.skip('Mounting to folder failed for filesystem: ' + fs_type + '. {}'.format(err))
637        call('rmdir %s' % mount_dir, shell=True)
638        call('rm -f %s' % fs_img, shell=True)
639        return
640
641    try:
642        # Create a subdirectory.
643        check_call('mkdir %s/SUBDIR' % mount_dir, shell=True)
644
645        # Create a small file in this image.
646        check_call('dd if=/dev/urandom of=%s bs=1M count=1'
647                   % small_file, shell=True)
648
649        # Create a medium file in this image.
650        check_call('dd if=/dev/urandom of=%s bs=10M count=1'
651                   % medium_file, shell=True)
652
653        # Generate the md5sums of reads that we will test against small file
654        out = check_output(
655            'dd if=%s bs=1M skip=0 count=1 2> /dev/null | md5sum'
656            % small_file, shell=True).decode()
657        md5val = [out.split()[0]]
658        out = check_output(
659            'dd if=%s bs=10M skip=0 count=1 2> /dev/null | md5sum'
660            % medium_file, shell=True).decode()
661        md5val.extend([out.split()[0]])
662
663    except CalledProcessError:
664        pytest.skip('Setup failed for filesystem: ' + fs_type)
665        umount_fs(mount_dir)
666        return
667    else:
668        umount_fs(mount_dir)
669        yield [fs_ubtype, fs_img, md5val]
670    finally:
671        call('rmdir %s' % mount_dir, shell=True)
672        call('rm -f %s' % fs_img, shell=True)
673