[go: up one dir, main page]

File: files.py

package info (click to toggle)
cozy 1.3.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,140 kB
  • sloc: python: 8,014; xml: 392; makefile: 2
file content (114 lines) | stat: -rw-r--r-- 4,118 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import os
import urllib
from pathlib import Path

from gi.repository import Gio

from cozy.architecture.event_sender import EventSender
import inject
from cozy.media.importer import Importer
from cozy.model.settings import Settings
from cozy.report import reporter
from cozy.ui.toaster import ToastNotifier

log = logging.getLogger("files")


class Files(EventSender):
    _settings = inject.attr(Settings)
    _importer = inject.attr(Importer)
    _toast: ToastNotifier = inject.attr(ToastNotifier)

    _file_count = 0
    _file_progess = 0

    def __init__(self):
        super().__init__()

    def copy(self, selection):
        log.info("Start of copying files")
        self.emit_event_main_thread("start-copy", None)

        paths = [f.get_path() for f in selection]
        storage_location = self._settings.default_location.path

        self._file_count = 0
        self._file_progess = 0

        self._count_all_files(paths)
        self._copy_all(paths, storage_location)

        log.info("Copying of files finished")
        self._importer.scan()

    def _copy_all(self, sources, destination: str):
        for path in sources:
            if os.path.isdir(path):
                self._copy_directory(path, destination)
            else:
                filename = os.path.basename(path)
                file_copy_destination = os.path.join(destination, filename)
                self._copy_file(path, file_copy_destination)

    def _copy_file(self, source_path: str, dest_path: str):
        log.info("Copy file %s to %s", source_path, dest_path)

        source = Gio.File.new_for_path(source_path)
        destination = Gio.File.new_for_path(dest_path)
        flags = Gio.FileCopyFlags.OVERWRITE
        self.filecopy_cancel = Gio.Cancellable()
        try:
            copied = source.copy(destination, flags, self.filecopy_cancel, self._update_copy_status, None)
        except Exception as e:
            if e.code == Gio.IOErrorEnum.CANCELLED:
                pass
            elif e.code == Gio.IOErrorEnum.READ_ONLY:
                self._toast.show(_("Cannot copy: Audiobook directory is read only"))
            elif e.code == Gio.IOErrorEnum.NO_SPACE:
                self._toast.show(_("Cannot copy: Disk is full"))
            elif e.code == Gio.IOErrorEnum.PERMISSION_DENIED:
                self._toast.show(_("Cannot copy: Permission denied"))
            else:
                reporter.exception("files", e)

            log.error("Failed to copy file: %s", e)
        self._file_progess += 1

    def _copy_directory(self, path, destination):
        main_source_path = os.path.split(path)[0]
        for dirpath, dirnames, filenames in os.walk(path):
            dirname = os.path.relpath(dirpath, main_source_path)
            destination_dir = os.path.join(destination, dirname)
            try:
                Path(destination_dir).mkdir(parents=True, exist_ok=True)
            except PermissionError as e:
                log.error(e)
                self._toast.show(_("Cannot copy: Permission denied"))
                return

            for file in filenames:
                source = os.path.join(dirpath, file)
                file_copy_destination = os.path.join(destination, dirname, file)
                self._copy_file(source, file_copy_destination)

    def _count_all_files(self, paths: list[str]) -> None:
        for path in paths:
            if os.path.isdir(path):
                self._file_count += self._count_files_in_folder(path)
            else:
                self._file_count += 1

    def _update_copy_status(self, current_num_bytes, total_num_bytes, _):
        if total_num_bytes == 0:
            total_num_bytes = 1

        if self._file_count == 0:
            progress = 1.0
        else:
            progress = (self._file_progess / self._file_count) + (
                    (current_num_bytes / total_num_bytes) / self._file_count)
        self.emit_event_main_thread("copy-progress", progress)

    def _count_files_in_folder(self, path: str) -> int:
        return sum([len(files) for r, d, files in os.walk(path)])