Compare commits

...

3 Commits

Author SHA1 Message Date
Hazel ac6c513d56 draft: post process song
ci/woodpecker/push/woodpecker Pipeline was successful Details
2024-05-15 12:30:54 +02:00
Hazel cc14253239 draft: streaming the audio 2024-05-15 12:18:08 +02:00
Hazel 14f986a497 draft: rewrote sources 2024-05-15 11:44:39 +02:00
7 changed files with 133 additions and 60 deletions

View File

@ -10,12 +10,12 @@ from ..objects import Target
LOGGER = logging_settings["codex_logger"]
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], interval_list: List[Tuple[float, float]] = None):
def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], audio_format: str = main_settings["audio_format"], skip_intervals: List[Tuple[float, float]] = None):
if not target.exists:
LOGGER.warning(f"Target doesn't exist: {target.file_path}")
return
interval_list = interval_list or []
skip_intervals = skip_intervals or []
bitrate_b = int(bitrate_kb / 1024)
@ -29,7 +29,7 @@ def correct_codec(target: Target, bitrate_kb: int = main_settings["bitrate"], au
start = 0
next_start = 0
for end, next_start in interval_list:
for end, next_start in skip_intervals:
aselect_list.append(f"between(t,{start},{end})")
start = next_start
aselect_list.append(f"gte(t,{next_start})")

View File

@ -178,8 +178,6 @@ class Downloader:
page_count = 0
for option in self.current_results.formatted_generator():
if isinstance(option, Option):
_downloadable = self.pages.is_downloadable(option.music_object)
r = f"{BColors.GREY.value}{option.index:0{self.option_digits}}{BColors.ENDC.value} {option.music_object.option_string}"
print(r)
else:

View File

@ -16,6 +16,7 @@ from ..objects import (
Artist,
Label,
)
from ..audio import write_metadata_to_target, correct_codec
from ..utils.string_processing import fit_to_file_system
from ..utils.config import youtube_settings, main_settings
from ..utils.path_manager import LOCATIONS
@ -125,15 +126,10 @@ class Pages:
return data_object
def fetch_from_source(self, source: Source, **kwargs) -> Optional[DataObject]:
page: Page = self._get_page_from_enum(source.source_type)
if page is None:
if not source.has_page:
return None
# getting the appropriate function for the page and the object type
source_type = page.get_source_type(source)
if not hasattr(page, fetch_map[source_type]):
return None
func = getattr(page, fetch_map[source_type])(source=source, **kwargs)
func = getattr(source.page, fetch_map[source_type])(source=source, **kwargs)
# fetching the data object and marking it as fetched
data_object: DataObject = func(source=source)
@ -146,15 +142,6 @@ class Pages:
return None
return self.fetch_from_source(source=source)
def is_downloadable(self, music_object: DataObject) -> bool:
_page_types = set(self._source_to_page)
for src in music_object.source_collection.source_pages:
if src in self._source_to_page:
_page_types.add(self._source_to_page[src])
audio_pages = self._audio_pages_set.intersection(_page_types)
return len(audio_pages) > 0
def _skip_object(self, data_object: DataObject) -> bool:
if isinstance(data_object, Album):
@ -224,11 +211,6 @@ class Pages:
return possible_parts
def _get_pages_with_source(self, data_object: DataObject, sort_by_attribute: str = "DOWNLOAD_PRIORITY") -> List[Page]:
pages = [self._get_page_from_enum(s.source_type) for s in data_object.source_collection.get_sources()]
pages.sort(key=lambda p: getattr(p, sort_by_attribute), reverse=True)
return list(pages)
def _download_song(self, song: Song, naming: dict) -> DownloadOptions:
"""
TODO
@ -257,7 +239,6 @@ class Pages:
# manage the targets
tmp: Target = Target.temp(file_extension=main_settings["audio_format"])
found_on_disc = False
song.target_collection.append(Target(
relative_to_music_dir=True,
@ -269,18 +250,54 @@ class Pages:
for target in song.target_collection:
if target.exists():
output(f'- {target.file_path} {BColors.OKGREEN.value}[already exists]', color=BColors.GREY)
found_on_disc = True
r.found_on_disk += 1
target.copy_content(tmp)
if self.download_options.download_again_if_found:
target.copy_content(tmp)
else:
target.create_parent_directories()
output(f'- {target.file_path}', color=BColors.GREY)
# actually download
for page in self._get_pages_with_source(song, sort_by_attribute="DOWNLOAD_PRIORITY"):
r = page.download_song_to_target(song, tmp, r)
# this streams from every available source until something succeeds, setting the skip intervals to the values of the according source
used_source: Optional[Source] = None
skip_intervals: List[Tuple[float, float]] = []
for source in song.source_collection.get_sources(source_type_sorting={
"only_with_page": True,
"sort_key": lambda page: page.download_priority,
"reverse": True,
}):
if tmp.exists:
break
used_source = source
streaming_results = source.page.download_song_to_target(source=source, target=tmp, desc="download")
skip_intervals = source.page.get_skip_intervals(song=song, source=source)
# if something has been downloaded but it somehow failed, delete the file
if streaming_results.is_fatal_error and tmp.exists:
tmp.delete()
# if everything went right, the file should exist now
if not tmp.exists:
if used_source is None:
r.error_message = f"No source found for {song.option_string}."
else:
r.error_message = f"Something went wrong downloading {song.option_string}."
return r
# post process the audio
found_on_disk = used_source is None
if not found_on_disk or self.download_options.process_audio_if_found:
correct_codec(target=tmp, skip_intervals=skip_intervals)
r.sponsor_segments = len(skip_intervals)
if used_source is not None:
used_source.page.post_process_hook(song=song, temp_target=tmp)
if not found_on_disc or self.download_options.process_metadata_if_found:
write_metadata_to_target(metadata=song.metadata, target=tmp, song=song)
tmp.delete()
return r
def fetch_url(self, url: str, stop_at_level: int = 2) -> Tuple[Type[Page], DataObject]:

View File

@ -2,7 +2,19 @@ from __future__ import annotations
from collections import defaultdict
from enum import Enum
from typing import List, Dict, Set, Tuple, Optional, Iterable, Generator
from typing import (
List,
Dict,
Set,
Tuple,
Optional,
Iterable,
Generator,
TypedDict,
Callable,
Any,
TYPE_CHECKING
)
from urllib.parse import urlparse, ParseResult
from dataclasses import dataclass, field
from functools import cached_property
@ -15,6 +27,8 @@ from ..utils.string_processing import hash_url, shorten_display_url
from .metadata import Mapping, Metadata
from .parents import OuterProxy
from .collection import Collection
if TYPE_CHECKING:
from ..pages.abstract import Page
@ -29,10 +43,6 @@ class Source:
def __post_init__(self):
self.referrer_page = self.referrer_page or self.source_type
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@classmethod
def match_url(cls, url: str, referrer_page: SourceType) -> Optional[Source]:
@ -77,6 +87,18 @@ class Source:
if url.startswith("https://myspace.com"):
return cls(SourceType.MYSPACE, url, referrer_page=referrer_page)
@property
def has_page(self) -> bool:
return self.source_type.page is not None
@property
def page(self) -> Page:
return self.source_type.page
@property
def parsed_url(self) -> ParseResult:
return urlparse(self.url)
@property
def hash_url(self) -> str:
return hash_url(self.url)
@ -99,11 +121,17 @@ class Source:
page_str = property(fget=lambda self: self.source_type.value)
class SourceTypeSorting(TypedDict):
sort_key: Callable[[SourceType], Any]
reverse: bool
only_with_page: bool
class SourceCollection:
__change_version__ = generate_id()
_indexed_sources: Dict[str, Source]
_page_to_source_list: Dict[SourceType, List[Source]]
_sources_by_type: Dict[SourceType, List[Source]]
def __init__(self, data: Optional[Iterable[Source]] = None, **kwargs):
self._page_to_source_list = defaultdict(list)
@ -111,15 +139,54 @@ class SourceCollection:
self.extend(data or [])
def has_source_page(self, *source_pages: SourceType) -> bool:
return any(source_page in self._page_to_source_list for source_page in source_pages)
def source_types(
self,
only_with_page: bool = False,
sort_key = lambda page: page.name,
reverse: bool = False
) -> Iterable[SourceType]:
"""
Returns a list of all source types contained in this source collection.
def get_sources(self, *source_pages: List[Source]) -> Generator[Source]:
if not len(source_pages):
source_pages = self.source_pages
Args:
only_with_page (bool, optional): If True, only returns source types that have a page, meaning you can download from them.
sort_key (function, optional): A function that defines the sorting key for the source types. Defaults to lambda page: page.name.
reverse (bool, optional): If True, sorts the source types in reverse order. Defaults to False.
for page in source_pages:
yield from self._page_to_source_list[page]
Returns:
Iterable[SourceType]: A list of source types.
"""
source_types: List[SourceType] = self._page_to_source_list.keys()
if only_with_page:
source_types = filter(lambda st: st.has_page, source_types)
return sorted(
source_types,
key=sort_key,
reverse=reverse
)
def get_sources(self, *source_types: List[SourceType], source_type_sorting: SourceTypeSorting = None) -> Generator[Source]:
"""
Retrieves sources based on the provided source types and source type sorting.
Args:
*source_types (List[Source]): Variable number of source types to filter the sources.
source_type_sorting (SourceTypeSorting): Sorting criteria for the source types. This is only relevant if no source types are provided.
Yields:
Generator[Source]: A generator that yields the sources based on the provided filters.
Returns:
None
"""
if not len(source_types):
source_type_sorting = source_type_sorting or {}
source_types = self.source_types(**source_type_sorting)
for source_type in source_types:
yield from self._page_to_source_list[source_type]
def append(self, source: Source):
if source is None:
@ -156,10 +223,6 @@ class SourceCollection:
def __merge__(self, other: SourceCollection, **kwargs):
self.extend(other)
@property
def source_pages(self) -> Iterable[SourceType]:
return sorted(self._page_to_source_list.keys(), key=lambda page: page.value)
@property
def hash_url_list(self) -> List[str]:
return [hash_url(source.url) for source in self.get_sources()]
@ -170,7 +233,7 @@ class SourceCollection:
@property
def homepage_list(self) -> List[str]:
return [source.homepage for source in self.source_pages]
return [source_type.homepage for source_type in self._sources_by_type.keys()]
def indexing_values(self) -> Generator[Tuple[str, str], None, None]:
for index in self._indexed_sources:

View File

@ -246,8 +246,6 @@ class Page:
else:
output(f'- {target.file_path}', color=BColors.GREY)
if not song.source_collection.has_source_page(self.SOURCE_TYPE):
return DownloadResult(error_message=f"No {self.__class__.__name__} source found for {song.option_string}.")
sources = song.source_collection.get_sources(self.SOURCE_TYPE)
@ -264,15 +262,15 @@ class Page:
r.merge(self._post_process_targets(
song=song,
temp_target=temp_target,
interval_list=skip_intervals,
skip_intervals=skip_intervals,
found_on_disc=found_on_disc,
))
return r
def _post_process_targets(self, song: Song, temp_target: Target, interval_list: List, found_on_disc: bool) -> DownloadResult:
def _post_process_targets(self, song: Song, temp_target: Target, skip_intervals: List, found_on_disc: bool) -> DownloadResult:
if not found_on_disc or self.download_options.process_audio_if_found:
correct_codec(temp_target, interval_list=interval_list)
correct_codec(temp_target, skip_intervals=skip_intervals)
self.post_process_hook(song, temp_target)
@ -288,7 +286,7 @@ class Page:
r.add_target(target)
temp_target.delete()
r.sponsor_segments += len(interval_list)
r.sponsor_segments += len(skip_intervals)
return r

View File

@ -51,7 +51,6 @@ class BandcampTypes(Enum):
class Bandcamp(Page):
SOURCE_TYPE = ALL_SOURCE_TYPES.BANDCAMP
LOGGER = logging_settings["bandcamp_logger"]
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(

View File

@ -41,8 +41,6 @@ class YouTube(SuperYouTube):
# CHANGE
SOURCE_TYPE = ALL_SOURCE_TYPES.YOUTUBE
NO_ADDITIONAL_DATA_FROM_SONG = False
def __init__(self, *args, **kwargs):
self.connection: Connection = Connection(
host=get_invidious_url(),