# Copyright (c) 2020-2021 Trim21 <i@trim21.me>
# Copyright (c) 2008-2014 Erik Svensson <erik.public@gmail.com>
# Licensed under the MIT license.
# pylint: disable=C0103
import datetime
from typing import TYPE_CHECKING, Any, Dict, List, Union, Optional
from transmission_rpc.utils import format_timedelta
from transmission_rpc.constants import PRIORITY, IDLE_LIMIT, RATIO_LIMIT
from transmission_rpc.lib_types import File, Field, _Timeout
if TYPE_CHECKING:
from transmission_rpc.client import Client
def get_status_old(code: int) -> str:
"""Get the torrent status using old status codes"""
mapping = {
(1 << 0): "check pending",
(1 << 1): "checking",
(1 << 2): "downloading",
(1 << 3): "seeding",
(1 << 4): "stopped",
}
return mapping[code]
_STATUS_NEW_MAPPING = {
0: "stopped",
1: "check pending",
2: "checking",
3: "download pending",
4: "downloading",
5: "seed pending",
6: "seeding",
}
def get_status_new(code: int) -> str:
"""Get the torrent status using new status codes"""
return _STATUS_NEW_MAPPING[code]
[docs]class Status(str):
"""A wrapped ``str`` for torrent status.
returned by :py:attr:`.Torrent.status`
"""
stopped: bool
check_pending: bool
checking: bool
download_pending: bool
downloading: bool
seed_pending: bool
seeding: bool
[docs] def __new__(cls, raw: str) -> "Status":
obj = super().__new__(cls, raw)
for status in _STATUS_NEW_MAPPING.values():
setattr(obj, status.replace(" ", "_"), raw == status)
return obj
[docs]class Torrent:
"""
Torrent is a class holding the data received from Transmission regarding a bittorrent transfer.
All fetched torrent fields are accessible through this class using attributes.
This class has a few convenience properties using the torrent data.
"""
[docs] def __init__(self, client: "Client", fields: Dict[str, Any]):
if "id" not in fields:
raise ValueError("Torrent requires an id")
self._fields: Dict[str, Field] = {}
self._update_fields(fields)
self._incoming_pending = False
self._outgoing_pending = False
self._client = client
@property
def id(self) -> int:
"""Returns the id for this torrent"""
return self._fields["id"].value
def _get_name_string(self) -> Optional[str]:
"""Get the name"""
name = None
# try to find name
if "name" in self._fields:
name = self._fields["name"].value
return name
def __repr__(self) -> str:
tid = self._fields["id"].value
name = self._get_name_string()
if name is not None:
return f'<Torrent {tid} "{name}">'
return f"<Torrent {tid}>"
def __str__(self) -> str:
name = self._get_name_string()
if name is not None:
return f'Torrent "{name}"'
return "Torrent"
def __copy__(self) -> "Torrent":
return Torrent(self._client, self._fields)
def __getattr__(self, name: str) -> Any:
try:
return self._fields[name].value
except KeyError:
raise AttributeError(f"No attribute {name}") from None
def _rpc_version(self) -> int:
"""Get the Transmission RPC API version."""
if self._client:
return self._client.rpc_version
return 2
def _dirty_fields(self) -> List[str]:
"""Enumerate changed fields"""
outgoing_keys = [
"bandwidthPriority",
"downloadLimit",
"downloadLimited",
"peer_limit",
"queuePosition",
"seedIdleLimit",
"seedIdleMode",
"seedRatioLimit",
"seedRatioMode",
"uploadLimit",
"uploadLimited",
]
fields = []
for key in outgoing_keys:
if key in self._fields and self._fields[key].dirty:
fields.append(key)
return fields
def _push(self) -> None:
"""Push changed fields to the server"""
dirty = self._dirty_fields()
args = {}
for key in dirty:
args[key] = self._fields[key].value
self._fields[key] = self._fields[key]._replace(dirty=False)
if len(args) > 0:
self._client.change_torrent(self.id, **args)
def _update_fields(self, other: Union["Torrent", Dict[str, Any]]) -> None:
"""
Update the torrent data from a Transmission JSON-RPC arguments dictionary
"""
if isinstance(other, dict):
for key, value in other.items():
self._fields[key.replace("-", "_")] = Field(value, False)
elif isinstance(other, Torrent):
for key in list(other._fields.keys()):
self._fields[key] = Field(other._fields[key].value, False)
else:
raise ValueError("Cannot update with supplied data")
self._incoming_pending = False
def _status(self) -> str:
"""Get the torrent status"""
code = self._fields["status"].value
if self._rpc_version() >= 14:
return get_status_new(code)
return get_status_old(code)
[docs] def files(self) -> List[File]:
"""
Get list of files for this torrent.
.. note ::
The order of the files is guaranteed. The index of file object is the id of the file
when calling :py:meth:`transmission_rpc.client.Client.set_files`.
.. code-block:: python
from transmission_rpc import Client
torrent = Client().get_torrent(0)
for file_id, file in enumerate(torrent.files()):
print(file_id, file)
"""
result: List[File] = []
if "files" in self._fields:
files = self._fields["files"].value
indices = range(len(files))
priorities = self._fields["priorities"].value
wanted = self._fields["wanted"].value
for item in zip(indices, files, priorities, wanted):
selected = bool(item[3])
priority = PRIORITY[item[2]]
result.append(
File(
selected=selected,
priority=priority,
size=item[1]["length"],
name=item[1]["name"],
completed=item[1]["bytesCompleted"],
)
)
return result
@property
def name(self) -> str:
"""Returns the name of this torrent.
Raise AttributeError if server don't return this field
"""
return self.__getattr__("name")
@property
def status(self) -> Status:
"""
:rtype: Status
Returns the torrent status. Is either one of 'check pending', 'checking',
'downloading', 'download pending', 'seeding', 'seed pending' or 'stopped'.
The first two is related to verification.
Examples:
.. code-block:: python
torrent = Torrent()
torrent.status.downloading
torrent.status == 'downloading'
"""
return Status(self._status())
@property
def rateDownload(self) -> int:
"""
Returns download rate in B/s
:rtype: int
"""
return self._fields["rateDownload"].value
@property
def rateUpload(self) -> int:
"""
Returns upload rate in B/s
:rtype: int
"""
return self._fields["rateUpload"].value
@property
def hashString(self) -> str:
"""Returns the info hash of this torrent.
:raise: AttributeError -- if server don't return this field
:rtype: int
"""
return self.__getattr__("hashString")
@property
def progress(self) -> float:
"""
download progress in percent.
:rtype: float
"""
try:
# https://gist.github.com/jackiekazil/6201722#gistcomment-2788556
return round((100.0 * self._fields["percentDone"].value), 2)
except KeyError:
try:
size = self._fields["sizeWhenDone"].value
left = self._fields["leftUntilDone"].value
return round((100.0 * (size - left) / float(size)), 2)
except ZeroDivisionError:
return 0.0
@property
def ratio(self) -> float:
"""
upload/download ratio.
:rtype: float
"""
return float(self._fields["uploadRatio"].value)
@property
def eta(self) -> datetime.timedelta:
"""
the "eta" as datetime.timedelta.
:rtype: datetime.timedelta
"""
eta = self._fields["eta"].value
if eta >= 0:
return datetime.timedelta(seconds=eta)
raise ValueError("eta not valid")
@property
def date_active(self) -> datetime.datetime:
"""the attribute ``activityDate`` as ``datetime.datetime`` in **UTC timezone**.
.. note::
raw ``activityDate`` value could be ``0`` for never activated torrent,
therefore it can't always be converted to local timezone.
:rtype: datetime.datetime
"""
return datetime.datetime.fromtimestamp(
self._fields["activityDate"].value, datetime.timezone.utc
)
@property
def date_added(self) -> datetime.datetime:
"""raw field ``addedDate`` as ``datetime.datetime`` in **local timezone**.
:rtype: datetime.datetime
"""
return datetime.datetime.fromtimestamp(
self._fields["addedDate"].value
).astimezone()
@property
def date_started(self) -> datetime.datetime:
"""raw field ``startDate`` as ``datetime.datetime`` in **local timezone**.
:rtype: datetime.datetime
"""
return datetime.datetime.fromtimestamp(
self._fields["startDate"].value
).astimezone()
@property
def date_done(self) -> Optional[datetime.datetime]:
"""the attribute "doneDate" as datetime.datetime. returns None if "doneDate" is invalid."""
done_date = self._fields["doneDate"].value
# Transmission might forget to set doneDate which is initialized to zero,
# so if doneDate is zero return None
if done_date == 0:
return None
return datetime.datetime.fromtimestamp(done_date).astimezone()
@property
def download_dir(self) -> Optional[str]:
"""The download directory.
:available: transmission version 1.5.
:available: RPC version 4.
"""
return self._fields["downloadDir"].value
@property
def download_limit(self) -> Optional[int]:
"""The download limit.
Can be a number or None.
"""
if self._fields["downloadLimited"].value:
return self._fields["downloadLimit"].value
return None
@download_limit.setter
def download_limit(self, limit: int) -> None:
"""Download limit in Kbps or None."""
if isinstance(limit, int):
self._fields["downloadLimited"] = Field(True, True)
self._fields["downloadLimit"] = Field(limit, True)
self._push()
elif limit is None:
self._fields["downloadLimited"] = Field(False, True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def peer_limit(self) -> int:
"""the peer limit."""
return self._fields["peer_limit"].value
@peer_limit.setter
def peer_limit(self, limit: int) -> None:
"""
Set the peer limit.
"""
if isinstance(limit, int):
self._fields["peer_limit"] = Field(limit, True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def priority(self) -> str:
"""
Bandwidth priority as string.
Can be one of 'low', 'normal', 'high'. This is a mutator.
"""
return PRIORITY[self._fields["bandwidthPriority"].value]
@priority.setter
def priority(self, priority: str) -> None:
if isinstance(priority, str):
self._fields["bandwidthPriority"] = Field(PRIORITY[priority], True)
self._push()
@property
def seed_idle_limit(self) -> int:
"""
seed idle limit in minutes.
"""
return self._fields["seedIdleLimit"].value
@seed_idle_limit.setter
def seed_idle_limit(self, limit: int) -> None:
"""
Set the seed idle limit in minutes.
"""
if isinstance(limit, int):
self._fields["seedIdleLimit"] = Field(limit, True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def is_finished(self) -> bool:
"""Returns true if the torrent is finished (available from rpc version 2.0)"""
return self._fields["isFinished"].value
@property
def is_stalled(self) -> bool:
"""Returns true if the torrent is stalled (available from rpc version 2.4)"""
return self._fields["isStalled"].value
@property
def size_when_done(self) -> int:
"""Size in bytes when the torrent is done"""
return self._fields["sizeWhenDone"].value
@property
def total_size(self) -> int:
"""Total size in bytes"""
return self._fields["totalSize"].value
@property
def left_until_done(self) -> int:
"""Bytes left until done"""
return self._fields["leftUntilDone"].value
@property
def desired_available(self) -> int:
"""Bytes that are left to download and available"""
return self._fields["desiredAvailable"].value
@property
def available(self) -> float:
"""Availability in percent"""
bytes_all = self.total_size
bytes_done = sum(
map(lambda x: x["bytesCompleted"], self._fields["fileStats"].value)
)
bytes_avail = self.desired_available + bytes_done
return (bytes_avail / bytes_all) * 100 if bytes_all else 0
@property
def seed_idle_mode(self) -> str:
"""
Seed idle mode as string. Can be one of 'global', 'single' or 'unlimited'.
* global, use session seed idle limit.
* single, use torrent seed idle limit. See seed_idle_limit.
* unlimited, no seed idle limit.
"""
return IDLE_LIMIT[self._fields["seedIdleMode"].value]
@seed_idle_mode.setter
def seed_idle_mode(self, mode: Union[str, int]) -> None:
"""
Set the seed ratio mode.
Can be one of ``global``, ``single`` or ``unlimited``, or ``0``, ``1``, ``2``.
"""
if isinstance(mode, str):
self._fields["seedIdleMode"] = Field(IDLE_LIMIT[mode], True)
self._push()
elif isinstance(mode, int):
self._fields["seedIdleMode"] = Field(IDLE_LIMIT[mode], True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def seed_ratio_limit(self) -> float:
"""
Torrent seed ratio limit as float. Also see seed_ratio_mode.
This is a mutator.
:rtype: float
"""
return float(self._fields["seedRatioLimit"].value)
@seed_ratio_limit.setter
def seed_ratio_limit(self, limit: Union[int, float]) -> None:
"""
Set the seed ratio limit as float.
"""
if isinstance(limit, (int, float)) and limit >= 0.0:
self._fields["seedRatioLimit"] = Field(float(limit), True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def seed_ratio_mode(self) -> str:
"""
Seed ratio mode as string. Can be one of 'global', 'single' or 'unlimited'.
* global, use session seed ratio limit.
* single, use torrent seed ratio limit. See seed_ratio_limit.
* unlimited, no seed ratio limit.
This is a mutator.
"""
return RATIO_LIMIT[self._fields["seedRatioMode"].value]
@seed_ratio_mode.setter
def seed_ratio_mode(self, mode: Union[str, int]) -> None:
"""
Set the seed ratio mode.
Can be one of ``'global'``, ``'single'`` or ``'unlimited'``, or ``0``, ``1``, ``2``.
"""
if isinstance(mode, str):
self._fields["seedRatioMode"] = Field(RATIO_LIMIT[mode], True)
self._push()
elif isinstance(mode, int):
self._fields["seedRatioMode"] = Field(mode, True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def upload_limit(self) -> Optional[int]:
"""
upload limit.
Can be a number or None.
"""
if self._fields["uploadLimited"].value:
return self._fields["uploadLimit"].value
return None
@upload_limit.setter
def upload_limit(self, limit: Optional[int]) -> None:
"""Upload limit in Kbps or None."""
if isinstance(limit, int):
self._fields["uploadLimited"] = Field(True, True)
self._fields["uploadLimit"] = Field(limit, True)
self._push()
elif limit is None:
self._fields["uploadLimited"] = Field(False, True)
self._push()
else:
raise ValueError("Not a valid limit")
@property
def queue_position(self) -> int:
"""queue position for this torrent."""
if self._rpc_version() >= 14:
return self._fields["queuePosition"].value
return 0
@queue_position.setter
def queue_position(self, position: str) -> None:
"""Queue position"""
if self._rpc_version() >= 14:
if isinstance(position, int):
self._fields["queuePosition"] = Field(position, True)
self._push()
else:
raise ValueError("Not a valid position")
else:
pass
[docs] def update(self, timeout: _Timeout = None) -> None:
"""Update the torrent information."""
self._push()
torrent = self._client.get_torrent(self.id, timeout=timeout)
self._update_fields(torrent)
[docs] def start(self, bypass_queue: bool = False, timeout: _Timeout = None) -> None:
"""
Start the torrent.
"""
self._incoming_pending = True
self._client.start_torrent(self.id, bypass_queue=bypass_queue, timeout=timeout)
[docs] def stop(self, timeout: _Timeout = None) -> None:
"""Stop the torrent."""
self._incoming_pending = True
self._client.stop_torrent(self.id, timeout=timeout)
[docs] def move_data(self, location: str, timeout: _Timeout = None) -> None:
"""Move torrent data to location."""
self._incoming_pending = True
self._client.move_torrent_data(self.id, location, timeout=timeout)
[docs] def locate_data(self, location: str, timeout: _Timeout = None) -> None:
"""Locate torrent data at location."""
self._incoming_pending = True
self._client.locate_torrent_data(self.id, location, timeout=timeout)