dbnomics_fetcher_toolbox package

Toolbox of functions and data types helping writing DBnomics fetchers.

The API is organized in sub-modules. They are presented below in alphabetic order.

The API is designed to be convenient to use from a fetcher script, and avoid making internal details appear in that script.

That’s why you will find some functions like dbnomics_fetcher_toolbox.logging_utils.setup_logging() taking args instead of more precise arguments.

aiohttp_utils module

Utility functions for aiohttp.

exception ChunkTimeoutError(bytes_count: int)

Bases: Exception

Chunk timeout error.

Exception raised by iter_chunks_with_timeout() when a chunk is too long to download.

Parameters

bytes_count – number of bytes downloaded with the previous chunks

bytes_count: int
add_arguments_for_chunks(parser: argparse.ArgumentParser)

Add arguments to parser to be used with iter_chunks_with_timeout().

get_trace_config() → aiohttp.tracing.TraceConfig

Get a TraceConfig instance configured to log aiohttp HTTP requests.

iter_chunks_with_timeout(response: aiohttp.client_reqrep.ClientResponse, args: argparse.Namespace) → AsyncIterator[bytes]

Raise ChunkTimeoutError if a timeout occurs while downloading a chunk.

This allows to handle servers that suddenly stop sending data, without having to wait for the global request timeout or an HTTP error like 104 “Connection reset by peer”.

arguments module

Functions handling script options of fetchers.

add_arguments_for_convert(parser: argparse.ArgumentParser)

Add arguments to parser used for a convert script.

add_arguments_for_download(parser: argparse.ArgumentParser)

Add arguments to parser used for a download script.

add_common_arguments(parser: argparse.ArgumentParser)

Add common arguments to parser.

Those arguments are common to both download and convert scripts.

natural_int(value: str) → int

Check that value is a positive integer.

readable_dir(value: str) → pathlib.Path

Check that value is a readable directory.

Example:

parser.add_argument('dir', type=readable_dir)
readable_file(value: str) → pathlib.Path

Check that value is a readable file.

Example:

parser.add_argument('file', type=readable_file)

cli module

Functions for CLI commands.

status_stats()

Compute and display statistics about a status.jsonl files.

data_model module

Functions and classes defining DBnomics data model.

CATEGORY_TREE_JSON = 'category_tree.json'

Name of the file containing data to represent a category tree.

class Category(*, children: List[Union[Category, dbnomics_fetcher_toolbox.data_model.DatasetReference]], code: str = None, name: str = None, doc_href: str = None)

Bases: pydantic.main.BaseModel

Represents a category node of a category tree.

children: List[Union[dbnomics_fetcher_toolbox.data_model.Category, dbnomics_fetcher_toolbox.data_model.DatasetReference]]
code: Optional[str]
classmethod code_or_name_exist(values)
doc_href: Optional[str]
name: Optional[str]
class CategoryTree(*, __root__: List[Union[dbnomics_fetcher_toolbox.data_model.Category, dbnomics_fetcher_toolbox.data_model.DatasetReference]])

Bases: pydantic.main.BaseModel

Represents a category tree of other categories or datasets pointers.

to_json_data() → List[dict]

Return data as it would be encoded to JSON.

DATASET_JSON = 'dataset.json'

Name of the file containing metadata about a dataset.

class DatasetReference(*, code: str, name: str = None, status: dbnomics_fetcher_toolbox.status.ResourceStatus = None)

Bases: pydantic.main.BaseModel

Represents a dataset node of a category tree.

code: str
name: Optional[str]
status: Optional[dbnomics_fetcher_toolbox.status.ResourceStatus]
NA: typing_extensions.Literal[NA] = 'NA'

Special value used when an observation value is Not Available.

exception NoTimeDimensionError

Bases: dbnomics_fetcher_toolbox.data_model.SeriesError

An error with the time dimension of a series.

exception ObservationError

Bases: dbnomics_fetcher_toolbox.data_model.SeriesError

An error with a series observation.

PROVIDER_JSON = 'provider.json'

Name of the file containing metadata about a provider.

SERIES_JSONL = 'series.jsonl'

Name of the file containing data about many time series.

exception SeriesError

Bases: ValueError

An error with a series.

clean_category_tree_json(category_tree_json: List[dict])

Clean category tree to remove SUCCESS statuses of dataset nodes recursively.

Mutate category_tree_json.

iter_dataset_references(category_tree: dbnomics_fetcher_toolbox.data_model.CategoryTree) → Iterator[dbnomics_fetcher_toolbox.data_model.DatasetReference]

Yield DatasetReference objects from category_tree.

Category tree is iterated recursively.

write_category_tree_json(directory: pathlib.Path, category_tree: dbnomics_fetcher_toolbox.data_model.CategoryTree)

Encode category_tree to JSON and write it to “category_tree.json”.

Parameters

directory – The directory to write the file to.

write_series_jsonl(directory: pathlib.Path, series: Iterable[dict])

Encode series to JSON Lines and write it to “series.jsonl”.

Each item of series must be a dict with a "code" key.

series are sorted by "code" in order to guarantee a stable file.

Parameters

directory – write the file in this directory

file_system_utils module

Utility functions about the file system.

iter_child_directories(directory: pathlib.Path, include_hidden: bool = False) → Iterator[pathlib.Path]

Yield child directories of directory.

If include_hidden=True, don’t skip child directories starting with a ".". By default the value is False, so that directories like .git are skipped.

formats module

Functions dealing with file formats like JSON, JSON Lines, XML or HTML.

async fetch_or_read_html(name: str, url: str, session: aiohttp.client.ClientSession, file: pathlib.Path, force: bool = False, on_fetch: Callable[[lxml.etree.Element], lxml.etree.Element] = None) → lxml.etree.Element

Fetch or read HTML.

Just call fetch_or_read_xml() with parser=HTML_PARSER and xml_declaration=False.

async fetch_or_read_xml(name: str, url: str, session: aiohttp.client.ClientSession, file: pathlib.Path, force: bool = False, parser: lxml.etree._FeedParser = None, on_fetch: Callable[[lxml.etree.Element], lxml.etree.Element] = None, xml_declaration: bool = True) → lxml.etree.Element

Fetch or read XML.

Load XML file from file or, if it does not exist, fetch it from url using session then save it to file. In any case, read it and return an Element.

The name parameter allows to customize logging messages.

The force parameter allows to force fetching instead of loading from file, even if the file exists.

A custom parser can be passed to etree.parse.

on_fetch is a callback that takes the fetched Element and returns another.

xml_declaration is passed to ElementTree.write.

Examples:

keyfamilies_element = await fetch_or_read_xml(
    name="key families XML file",
    url=urljoin(args.api_base_url,
                "/restsdmx/sdmx.ashx/GetDataStructure/all/all"),
    session=session,
    file=args.target_dir / "keyfamilies.xml",
    on_fetch=sdmx_v2_0.remove_prepared_date,
)

category_tree_element = await fetch_or_read_xml(
    name="category tree HTML file",
    url=args.api_base_url,
    session=session,
    file=args.target_dir / "category_tree.html",
    parser=HTML_PARSER,
    on_fetch=lambda element: element.find(
        './/{*}div[@id="browsethemes"]/ul[@class="treeview"]'
    ),
)
async fetch_xml(url: str, session: aiohttp.client.ClientSession, parser: lxml.etree._FeedParser = None) → lxml.etree.Element

Fetch an XML file from url using session.

A custom parser can be passed to etree.parse.

read_html(file: pathlib.Path) → lxml.etree.Element

Read HTML from file and return an Element.

Due to lxml.etree.HTMLParser, the returned Element always starts with a <html> element so the caller has to call Element.find() in order to access the wanted child element.

Call read_xml() with parser=HTML_PARSER.

read_xml(file: pathlib.Path, parser: lxml.etree._FeedParser = None) → lxml.etree.Element

Read XML from file and return an Element.

A custom parser can be passed to etree.parse.

write_html(file: pathlib.Path, element: lxml.etree.Element, pretty_print: bool = True)

Encode element to HTML and write it to file.

pretty_print is passed to ElementTree.write.

write_json(file: pathlib.Path, data: Any)

Encode data to JSON and write it to file.

write_jsonl(file: pathlib.Path, items: Iterable[Any])

Encode items to JSON Lines and write them to file.

write_xml(file: pathlib.Path, element: lxml.etree.Element, pretty_print: bool = True, xml_declaration: bool = True)

Encode data to XML and write it to file.

pretty_print and xml_declaration are passed to ElementTree.write.

logging_utils module

Utility functions about logging.

setup_logging(args: argparse.Namespace)

Initialize logging.

Log level is DEBUG if the --debug option was given.

parts module

Functions handling resource parts.

exception SplitOneDimension

Bases: Exception

Raise this exception to trigger a split on one dimension.

In particular raise it from the process_resource callback of dbnomics_fetcher_toolbox.resources.process_resources().

dimensions_to_str(dimensions: Dict[str, List[str]], is_initial_dimensions: bool) → str

Convert dimensions to str.

A dimension dict is generally too large and makes logs difficult to read. This method returns a shorter string to represent dimensions.

Parameters

is_initial_dimensions – if True return "all", otherwise compute a hash of dimensions.

async process_parts(resource: dbnomics_fetcher_toolbox.resources.Resource, args: argparse.Namespace, initial_dimensions: Dict[str, List[str]], process_part: Callable[[Dict[str, List[str]], str, bool], Awaitable[None]], on_event: Callable[[dbnomics_fetcher_toolbox.status.PartEvent], None] = None, events: Sequence[dbnomics_fetcher_toolbox.status.PartEvent] = None, dimensions_to_str: Callable[[Dict[str, List[str]], bool], str] = <function dimensions_to_str>, select_split_candidate: Callable[[Dict[str, List[str]]], str] = <function select_median_low>)

Process a resource by processing its parts.

process_part can raise a SplitOneDimension exception, meaning that the current part must be split on one dimension.

select_first_alphabetic(candidates: Dict[str, List[str]]) → str

Select the first dimension in alphabetic order.

select_median_low(candidates: Dict[str, List[str]]) → str

Select the dimension having the “median low” number of values.

To avoid both:

  • the one with the least values because it has a higher probability to return too many results

  • the one with the most values because it could lead to URL too long

split(dimensions: Dict[str, List[str]], select_candidate: Callable[[Dict[str, List[str]]], str]) → Tuple[str, Dict[str, List[str]], Dict[str, List[str]]]

Split dimensions.

Raise ValueError if dimensions are not splittable, i.e. all dimensions have one code.

split_one_dimension(dimensions: Dict[str, List[str]], select_candidate: Callable[[Dict[str, List[str]]], str]) → Tuple[str, List[str], List[str]]

Choose a splittable dimension and split its codes in 2 sub-lists.

Candidates are dimensions having more than one value code.

Raise ValueError if dimensions are not splittable, i.e. all dimensions have one code.

>>> split_one_dimension({}, select_median_low)
Traceback (most recent call last):
    ...
ValueError: No dimension defined, can't split
>>> split_one_dimension({'FREQ': ['A']}, select_median_low)
Traceback (most recent call last):
    ...
ValueError: All dimensions have one value, can't split more
>>> split_one_dimension({'FREQ': ['A'], 'COUNTRY': ['FR']}, select_median_low)
Traceback (most recent call last):
    ...
ValueError: All dimensions have one value, can't split more
>>> split_one_dimension({'FREQ': ['A', 'Q']}, select_median_low)
('FREQ', ['A'], ['Q'])
>>> split_one_dimension({'FREQ': ['A', 'Q'], 'COUNTRY': ['FR']}, select_median_low)
('FREQ', ['A'], ['Q'])
>>> split_one_dimension({'FREQ': ['A', 'Q'], 'COUNTRY': ['FR', 'DE']},
...                     select_median_low)
('COUNTRY', ['FR'], ['DE'])
>>> split_one_dimension({'FREQ': ['A', 'Q'], 'COUNTRY': ['FR', 'DE', 'IT']},
...                     select_median_low)
('FREQ', ['A'], ['Q'])

resources module

Functions and data types helping processing resources in DBnomics fetchers.

class DbnomicsDatasetResource(*, id: str, base_dir: pathlib.Path)

Bases: dbnomics_fetcher_toolbox.resources.Resource

A resource representing a dataset converted to DBnomics data model.

base_dir: pathlib.Path
create_context()

Create the dataset target directory, following DBnomics data model.

delete()

Delete the dataset target directory, following DBnomics data model.

property target_dir

Directory where the dataset will be written, following DBnomics data model.

The name of the directory is the resource id.

class Resource(*, id: str)

Bases: pydantic.main.BaseModel

A resource to be processed by process_resources().

create_context()

Create a context necessary to process the resource.

This method is called by process_resources() before calling process_resource.

Override it to do anything you need (e.g. creating a directory…).

delete()

Delete a resource.

This method is called by process_resources() if any error occurred during the execution of the process_resource callback.

Override it to do anything you need (e.g. delete a directory…).

id: str
async process_resources(resources: Sequence[dbnomics_fetcher_toolbox.resources.Resource], args: argparse.Namespace, process_resource: Callable[[dbnomics_fetcher_toolbox.resources.Resource], Awaitable[None]], on_event: Callable[[dbnomics_fetcher_toolbox.status.ResourceEvent], None] = None, events: Sequence[dbnomics_fetcher_toolbox.status.ResourceEvent] = None) → Dict[str, dbnomics_fetcher_toolbox.status.ResourceEvent]

Handle the common work of processing resources.

Iterate over resources:

  • removing the excluded ones if the --exclude option is used

  • keeping only some of them if the --only option is used

  • processing a limited number of resources if the --limit option is used

By default do not process resources that were already processed with SUCCESS or FAILURE status. If the option --retry-failed is used, retry resources with FAILURE status. If the option --force is used, process all resources.

For each resource, call process_resource(resource), logging messages allowing to track the processing progress. If an exception is raised during the execution of process_resource:

  • log the error and process the next resource, or re-raise if --fail-fast option is used

  • call resource.delete() if --delete-on-error option is used

sdmx_v2_0 module

Functions and classes defining SDMX v2.0 data model.

class AttachmentLevel(value)

Bases: enum.Enum

Values of the attachmentLevel attribute of the <Attribute> SDMX element.

DATASET = 'Dataset'
OBSERVATION = 'Observation'
SERIES = 'Series'
class Attribute(*, codelist_id: str, concept_id: str, attachment_level: dbnomics_fetcher_toolbox.sdmx_v2_0.AttachmentLevel)

Bases: pydantic.main.BaseModel

Represents an <Attribute> SDMX element.

attachment_level: dbnomics_fetcher_toolbox.sdmx_v2_0.AttachmentLevel
codelist_id: str
concept_id: str
class Code(*, value: str, descriptions: Dict[str, str], parent_code: str = None)

Bases: pydantic.main.BaseModel

Represents a <Code> SDMX element.

descriptions: Dict[str, str]
parent_code: Optional[str]
value: str
class CodeList(*, id: str, names: Dict[str, str], codes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Code])

Bases: pydantic.main.BaseModel

Represents a <CodeList> SDMX element.

codes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Code]
id: str
names: Dict[str, str]
class Concept(*, id: str, names: Dict[str, str])

Bases: pydantic.main.BaseModel

Represents a <Concept> SDMX element.

id: str
names: Dict[str, str]
class Dataset(*, series: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Series])

Bases: pydantic.main.BaseModel

Represents a <Dataset> SDMX element.

series: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Series]
class DatasetStructure(*, id: str, names: Dict[str, str], codelists: List[dbnomics_fetcher_toolbox.sdmx_v2_0.CodeList], concepts: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Concept], dimensions: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Dimension], attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Attribute])

Bases: pydantic.main.BaseModel

Represents a <Structure> SDMX element as used to describe one dataset.

The <Components> SDMX element is flatten: its children <Dimension> and <Attribute> are directly available under dimensions and attributes.

attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Attribute]
codelists: List[dbnomics_fetcher_toolbox.sdmx_v2_0.CodeList]
concepts: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Concept]
classmethod dimension_codelists_exist(values)
dimensions: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Dimension]
get_codelist(codelist_id: str)dbnomics_fetcher_toolbox.sdmx_v2_0.CodeList

Return the codelist correponsing to codelist_id.

get_concept(concept_id: str) → Optional[dbnomics_fetcher_toolbox.sdmx_v2_0.Concept]

Return the concept correponsing to concept_id.

get_dimension(concept_id: str) → Optional[dbnomics_fetcher_toolbox.sdmx_v2_0.Dimension]

Return the dimension correponsing to concept_id.

id: str
names: Dict[str, str]
class Dimension(*, codelist_id: str, concept_id: str)

Bases: pydantic.main.BaseModel

Represents a <Dimension> SDMX element.

codelist_id: str
concept_id: str
class KeyFamily(*, id: str, names: Dict[str, str])

Bases: pydantic.main.BaseModel

Represents a <KeyFamily> SDMX element.

id: str
names: Dict[str, str]
class Obs(*, value: Union[typing_extensions.Literal[NaN], float], time: str = None, attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value])

Bases: pydantic.main.BaseModel

Represents an <Obs> SDMX element.

attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]
find_attribute_value(concept_id: str) → Optional[str]

Find the value of the attribute identified by concept_id.

time: Optional[str]
value: Union[typing_extensions.Literal[NaN], float]
class Series(*, key: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value], attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value], observations: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Obs])

Bases: pydantic.main.BaseModel

Represents a <Series> SDMX element.

attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]
find_attribute_value(concept_id: str) → Optional[str]

Find the value of the attribute identified by concept_id.

find_key_value(concept_id: str) → Optional[str]

Find the value of the key identified by concept_id.

key: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]
property key_str

Return Series.key as a str.

For each Value item of the key list, take Value.value, and join them all by a ".".

>>> series = Series(key=[
...     Value(concept_id='FREQ', value='A'),
...     Value(concept_id='COUNTRY', value='FR'),
... ], attributes=[], observations=[])
>>> series.key_str
'A.FR'
observations: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Obs]
class Value(*, concept_id: str, value: str)

Bases: pydantic.main.BaseModel

Represents a <Value> SDMX element.

concept_id: str
value: str
build_dimension_mask(structure: dbnomics_fetcher_toolbox.sdmx_v2_0.DatasetStructure, dimensions: Dict[str, List[str]]) → str

Build a dimension mask.

Return a string representing a selection of dimensions, as often used in SDMX APIs. This is useful to search series by dimension.

Raise ValueError if a dimension of dimensions can’t be found.

find_value_value(concept_id: str, values: Sequence[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]) → Optional[str]

Find the value of the items in values identified by concept_id.

get_one_name(names: Dict[str, str], lang_candidates: Sequence[str] = None) → Optional[str]

Return a name among names.

lang_candidates can be used to choose a preferred language. Default value is None, which means that the first available name will be returned.

iter_keyfamilies(structure_element: lxml.etree.Element) → Iterator[dbnomics_fetcher_toolbox.sdmx_v2_0.KeyFamily]

Yield KeyFamily objects found in structure_element.

keyfamily_to_dataset_references(keyfamily: dbnomics_fetcher_toolbox.sdmx_v2_0.KeyFamily, lang_candidates: Sequence[str] = None)dbnomics_fetcher_toolbox.data_model.DatasetReference

Convert a SDMX 2.0 KeyFamily into a DBnomics dataset reference.

Return a dbnomics_fetcher_toolbox.data_model.DatasetReference built from keyfamily.

Use lang_candidates to choose a preferred language. It is forwarded to get_one_name().

load_dataset(dataset_element: lxml.etree.Element)dbnomics_fetcher_toolbox.sdmx_v2_0.Dataset

Return a Dataset built from the given XML element.

load_dataset_structure(structure_element: lxml.etree.Element)dbnomics_fetcher_toolbox.sdmx_v2_0.DatasetStructure

Return a DatasetStructure built from given XML element.

parse_observation_value(value: str) → Union[typing_extensions.Literal[NaN], float]

Parse str and return a float or the literal string "NaN".

If value can’t be converted to a float and is different from "NaN", raise a ValueError.

>>> parse_observation_value(NAN)
'NaN'
>>> parse_observation_value(1.2)
1.2
>>> parse_observation_value('Hello')
Traceback (most recent call last):
    ...
ValueError: Invalid value 'Hello' for a SDMX observation
remove_prepared_date(element: lxml.etree.Element) → lxml.etree.Element

Remove prepared date from XML element.

This is sometimes useful to avoid triggering a false commit in source data.

Mutate element and return it to ease using that function as a callback, for example with dbnomics_fetcher_toolbox.formats.fetch_or_read_xml().

series_to_series_json(series: dbnomics_fetcher_toolbox.sdmx_v2_0.Series) → dict

Return a dict representing a series, following DBnomics data model.

structure_to_dataset_json(dataset_code: str, structure: dbnomics_fetcher_toolbox.sdmx_v2_0.DatasetStructure, lang_candidates: Sequence[str] = None, all_series: Sequence[dbnomics_fetcher_toolbox.sdmx_v2_0.Series] = None) → dict

Return a dict representing a dataset, following DBnomics data model.

Use lang_candidates to choose a preferred language. It is forwarded to get_one_name().

Use all_series to write only the dimensions and the attributes actually used by the series.

status module

Types and functions allowing to handle status file.

class BaseEvent(*, type: dbnomics_fetcher_toolbox.status.EventType, id: str, emitted_at: datetime.datetime = None, duration: float, message: str = None)

Bases: pydantic.main.BaseModel

A base class for structured events.

duration: float
emitted_at: datetime.datetime
id: str
message: Optional[str]
classmethod set_emitted_at_now(v)
to_json_data()

Return data as it would be encoded to JSON.

type: dbnomics_fetcher_toolbox.status.EventType
class EventType(value)

Bases: enum.Enum

The resulting state of processing a resource.

RESOURCE = 'RESOURCE'
RESOURCE_PART = 'RESOURCE_PART'
class PartEvent(*, type: dbnomics_fetcher_toolbox.status.EventType = <EventType.RESOURCE_PART: 'RESOURCE_PART'>, id: str, emitted_at: datetime.datetime = None, duration: float, message: str = None, resource_id: str, status: dbnomics_fetcher_toolbox.status.PartStatus, series_count: int = None, split_dimension: str = None, split_parts: Tuple[str, str] = None)

Bases: dbnomics_fetcher_toolbox.status.BaseEvent

Information gathered during the processing of a resource part.

resource_id: str
series_count: Optional[int]
split_dimension: Optional[str]
split_parts: Optional[Tuple[str, str]]
status: dbnomics_fetcher_toolbox.status.PartStatus
class PartStatus(value)

Bases: enum.Enum

The resulting state of processing a resource part.

FAILURE = 'FAILURE'
SPLIT = 'SPLIT'
SUCCESS = 'SUCCESS'
class ResourceEvent(*, type: dbnomics_fetcher_toolbox.status.EventType = <EventType.RESOURCE: 'RESOURCE'>, id: str, emitted_at: datetime.datetime = None, duration: float, message: str = None, status: dbnomics_fetcher_toolbox.status.ResourceStatus)

Bases: dbnomics_fetcher_toolbox.status.BaseEvent

Information gathered during the processing of a resource.

status: dbnomics_fetcher_toolbox.status.ResourceStatus
class ResourceStatus(value)

Bases: enum.Enum

The resulting state of processing a resource.

FAILURE = 'FAILURE'
SKIPPED = 'SKIPPED'
SUCCESS = 'SUCCESS'
dedupe_events(events: Iterable[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]) → List[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]

Yield events in chronological order, deduped by event.id.

Because the status file is an activity log, it can contain multiple items having the same event id. This function dedupes events by id by keeping the latest ones in chronological order.

iter_events(file: pathlib.Path) → Iterator[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]

Yield events from file, ignoring events without or with invalid type.

load_events(target_dir: pathlib.Path, dedupe: bool = True) → Optional[List[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]]

Load events from status.jsonl expected to be found in target_dir.

If dedupe==True (default), the events are deduped by id, keeping only the latest one in chronological order. Otherwise all the events are returned.

load_events_from_file(file: pathlib.Path, dedupe: bool = True) → List[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]

Load events from file.

If dedupe==True (default), the events are deduped by id, keeping only the latest one in chronological order. Otherwise all the events are returned.

open_status_writer(args: argparse.Namespace) → Iterator[Callable[[dbnomics_fetcher_toolbox.status.BaseEvent], None]]

Open a writer to create a status.jsonl file and fill it with events.

Use it as a context manager.

If --flush-status option was given, flush the file after appending each event.

Example:

with status.open_status_writer(args) as append_event:
    await process_resources(
        resources=resources,
        args=args,
        process_resource=process_resource,
        on_event=append_event,
        events=events,
    )

utils module

Utility functions.

find(predicate: Callable[[T], bool], items: Iterable[T], default=None) → Optional[T]

Find the first item in items satisfying predicate(item).

Return the found item, or return default if no item was found.

>>> find(lambda item: item > 2, [1, 2, 3, 4])
3
>>> find(lambda item: item > 10, [1, 2, 3, 4])
>>> find(lambda item: item > 10, [1, 2, 3, 4], default=42)
42
is_empty(value: Any) → bool

Return True if value is empty.

Empty values are [], {}, None, "", but not False, 0.

>>> is_empty(0)
False
>>> is_empty(1)
False
>>> is_empty([])
True
>>> is_empty([1])
False
>>> is_empty({})
True
>>> is_empty({'a': 1})
False
>>> is_empty('')
True
>>> is_empty('hi')
False
>>> is_empty(set())
True
>>> is_empty({1})
False
>>> is_empty(None)
True
without_empty_values(mapping: Mapping[K, V]) → Dict[K, V]

Return a dict built from mapping without its empty values.

This function does not apply recursively.

Testing emptiness of values is done by is_empty.

>>> without_empty_values(  
...     {'name': 'Robert', 'children': None, 'age': 42,
...     'nb_gold_medals': 0, 'hobbies': [],
...     'houses': [{'city': 'Dallas'}],
...     'notes': {'maths': 'A', 'tech': None}})
{'name': 'Robert', 'age': 42, 'nb_gold_medals': 0, 'houses': [{'city': 'Dallas'}],
 'notes': {'maths': 'A', 'tech': None}}