dbnomics_fetcher_toolbox package¶
Toolbox of functions and data types helping writing DBnomics fetchers.
The API is organized in sub-modules. They are presented below in alphabetic order.
The API is designed to be convenient to use from a fetcher script, and avoid making internal details appear in that script.
That’s why you will find some functions like
dbnomics_fetcher_toolbox.logging_utils.setup_logging()
taking args
instead of more precise arguments.
aiohttp_utils module¶
Utility functions for aiohttp
.
-
exception
ChunkTimeoutError
(bytes_count: int)¶ Bases:
Exception
Chunk timeout error.
Exception raised by
iter_chunks_with_timeout()
when a chunk is too long to download.- Parameters
bytes_count – number of bytes downloaded with the previous chunks
-
bytes_count
: int¶
-
add_arguments_for_chunks
(parser: argparse.ArgumentParser)¶ Add arguments to
parser
to be used withiter_chunks_with_timeout()
.
-
get_trace_config
() → aiohttp.tracing.TraceConfig¶ Get a
TraceConfig
instance configured to logaiohttp
HTTP requests.
-
iter_chunks_with_timeout
(response: aiohttp.client_reqrep.ClientResponse, args: argparse.Namespace) → AsyncIterator[bytes]¶ Raise
ChunkTimeoutError
if a timeout occurs while downloading a chunk.This allows to handle servers that suddenly stop sending data, without having to wait for the global request timeout or an HTTP error like 104 “Connection reset by peer”.
arguments module¶
Functions handling script options of fetchers.
-
add_arguments_for_convert
(parser: argparse.ArgumentParser)¶ Add arguments to
parser
used for a convert script.
-
add_arguments_for_download
(parser: argparse.ArgumentParser)¶ Add arguments to
parser
used for a download script.
-
add_common_arguments
(parser: argparse.ArgumentParser)¶ Add common arguments to
parser
.Those arguments are common to both download and convert scripts.
-
natural_int
(value: str) → int¶ Check that
value
is a positive integer.
-
readable_dir
(value: str) → pathlib.Path¶ Check that
value
is a readable directory.Example:
parser.add_argument('dir', type=readable_dir)
-
readable_file
(value: str) → pathlib.Path¶ Check that
value
is a readable file.Example:
parser.add_argument('file', type=readable_file)
cli module¶
Functions for CLI commands.
-
status_stats
()¶ Compute and display statistics about a
status.jsonl
files.
data_model module¶
Functions and classes defining DBnomics data model.
-
CATEGORY_TREE_JSON
= 'category_tree.json'¶ Name of the file containing data to represent a category tree.
-
class
Category
(*, children: List[Union[Category, dbnomics_fetcher_toolbox.data_model.DatasetReference]], code: str = None, name: str = None, doc_href: str = None)¶ Bases:
pydantic.main.BaseModel
Represents a category node of a category tree.
-
children
: List[Union[dbnomics_fetcher_toolbox.data_model.Category, dbnomics_fetcher_toolbox.data_model.DatasetReference]]¶
-
code
: Optional[str]¶
-
classmethod
code_or_name_exist
(values)¶
-
doc_href
: Optional[str]¶
-
name
: Optional[str]¶
-
-
class
CategoryTree
(*, __root__: List[Union[dbnomics_fetcher_toolbox.data_model.Category, dbnomics_fetcher_toolbox.data_model.DatasetReference]])¶ Bases:
pydantic.main.BaseModel
Represents a category tree of other categories or datasets pointers.
-
to_json_data
() → List[dict]¶ Return data as it would be encoded to JSON.
-
-
DATASET_JSON
= 'dataset.json'¶ Name of the file containing metadata about a dataset.
-
class
DatasetReference
(*, code: str, name: str = None, status: dbnomics_fetcher_toolbox.status.ResourceStatus = None)¶ Bases:
pydantic.main.BaseModel
Represents a dataset node of a category tree.
-
code
: str¶
-
name
: Optional[str]¶
-
status
: Optional[dbnomics_fetcher_toolbox.status.ResourceStatus]¶
-
-
NA
: typing_extensions.Literal[NA] = 'NA'¶ Special value used when an observation value is Not Available.
-
exception
NoTimeDimensionError
¶ Bases:
dbnomics_fetcher_toolbox.data_model.SeriesError
An error with the time dimension of a series.
-
exception
ObservationError
¶ Bases:
dbnomics_fetcher_toolbox.data_model.SeriesError
An error with a series observation.
-
PROVIDER_JSON
= 'provider.json'¶ Name of the file containing metadata about a provider.
-
SERIES_JSONL
= 'series.jsonl'¶ Name of the file containing data about many time series.
-
exception
SeriesError
¶ Bases:
ValueError
An error with a series.
-
clean_category_tree_json
(category_tree_json: List[dict])¶ Clean category tree to remove SUCCESS statuses of dataset nodes recursively.
Mutate
category_tree_json
.
-
iter_dataset_references
(category_tree: dbnomics_fetcher_toolbox.data_model.CategoryTree) → Iterator[dbnomics_fetcher_toolbox.data_model.DatasetReference]¶ Yield
DatasetReference
objects fromcategory_tree
.Category tree is iterated recursively.
-
write_category_tree_json
(directory: pathlib.Path, category_tree: dbnomics_fetcher_toolbox.data_model.CategoryTree)¶ Encode
category_tree
to JSON and write it to “category_tree.json”.- Parameters
directory – The directory to write the file to.
-
write_series_jsonl
(directory: pathlib.Path, series: Iterable[dict])¶ Encode
series
to JSON Lines and write it to “series.jsonl”.Each item of
series
must be adict
with a"code"
key.series
are sorted by"code"
in order to guarantee a stable file.- Parameters
directory – write the file in this directory
file_system_utils module¶
Utility functions about the file system.
-
iter_child_directories
(directory: pathlib.Path, include_hidden: bool = False) → Iterator[pathlib.Path]¶ Yield child directories of
directory
.If
include_hidden=True
, don’t skip child directories starting with a"."
. By default the value isFalse
, so that directories like.git
are skipped.
formats module¶
Functions dealing with file formats like JSON, JSON Lines, XML or HTML.
-
async
fetch_or_read_html
(name: str, url: str, session: aiohttp.client.ClientSession, file: pathlib.Path, force: bool = False, on_fetch: Callable[[lxml.etree.Element], lxml.etree.Element] = None) → lxml.etree.Element¶ Fetch or read HTML.
Just call
fetch_or_read_xml()
withparser=HTML_PARSER
andxml_declaration=False
.
-
async
fetch_or_read_xml
(name: str, url: str, session: aiohttp.client.ClientSession, file: pathlib.Path, force: bool = False, parser: lxml.etree._FeedParser = None, on_fetch: Callable[[lxml.etree.Element], lxml.etree.Element] = None, xml_declaration: bool = True) → lxml.etree.Element¶ Fetch or read XML.
Load XML file from
file
or, if it does not exist, fetch it fromurl
usingsession
then save it tofile
. In any case, read it and return anElement
.The
name
parameter allows to customize logging messages.The
force
parameter allows to force fetching instead of loading from file, even if the file exists.A custom
parser
can be passed toetree.parse
.on_fetch
is a callback that takes the fetchedElement
and returns another.xml_declaration
is passed toElementTree.write
.Examples:
keyfamilies_element = await fetch_or_read_xml( name="key families XML file", url=urljoin(args.api_base_url, "/restsdmx/sdmx.ashx/GetDataStructure/all/all"), session=session, file=args.target_dir / "keyfamilies.xml", on_fetch=sdmx_v2_0.remove_prepared_date, ) category_tree_element = await fetch_or_read_xml( name="category tree HTML file", url=args.api_base_url, session=session, file=args.target_dir / "category_tree.html", parser=HTML_PARSER, on_fetch=lambda element: element.find( './/{*}div[@id="browsethemes"]/ul[@class="treeview"]' ), )
-
async
fetch_xml
(url: str, session: aiohttp.client.ClientSession, parser: lxml.etree._FeedParser = None) → lxml.etree.Element¶ Fetch an XML file from
url
usingsession
.A custom
parser
can be passed toetree.parse
.
-
read_html
(file: pathlib.Path) → lxml.etree.Element¶ Read HTML from
file
and return anElement
.Due to
lxml.etree.HTMLParser
, the returnedElement
always starts with a<html>
element so the caller has to callElement.find()
in order to access the wanted child element.Call
read_xml()
withparser=HTML_PARSER
.
-
read_xml
(file: pathlib.Path, parser: lxml.etree._FeedParser = None) → lxml.etree.Element¶ Read XML from
file
and return anElement
.A custom
parser
can be passed toetree.parse
.
-
write_html
(file: pathlib.Path, element: lxml.etree.Element, pretty_print: bool = True)¶ Encode
element
to HTML and write it tofile
.pretty_print
is passed toElementTree.write
.
-
write_json
(file: pathlib.Path, data: Any)¶ Encode
data
to JSON and write it tofile
.
-
write_jsonl
(file: pathlib.Path, items: Iterable[Any])¶ Encode
items
to JSON Lines and write them tofile
.
-
write_xml
(file: pathlib.Path, element: lxml.etree.Element, pretty_print: bool = True, xml_declaration: bool = True)¶ Encode
data
to XML and write it tofile
.pretty_print
andxml_declaration
are passed toElementTree.write
.
logging_utils module¶
Utility functions about logging.
-
setup_logging
(args: argparse.Namespace)¶ Initialize logging.
Log level is DEBUG if the
--debug
option was given.
parts module¶
Functions handling resource parts.
-
exception
SplitOneDimension
¶ Bases:
Exception
Raise this exception to trigger a split on one dimension.
In particular raise it from the
process_resource
callback ofdbnomics_fetcher_toolbox.resources.process_resources()
.
-
dimensions_to_str
(dimensions: Dict[str, List[str]], is_initial_dimensions: bool) → str¶ Convert
dimensions
tostr
.A dimension
dict
is generally too large and makes logs difficult to read. This method returns a shorter string to represent dimensions.- Parameters
is_initial_dimensions – if
True
return"all"
, otherwise compute a hash ofdimensions
.
-
async
process_parts
(resource: dbnomics_fetcher_toolbox.resources.Resource, args: argparse.Namespace, initial_dimensions: Dict[str, List[str]], process_part: Callable[[Dict[str, List[str]], str, bool], Awaitable[None]], on_event: Callable[[dbnomics_fetcher_toolbox.status.PartEvent], None] = None, events: Sequence[dbnomics_fetcher_toolbox.status.PartEvent] = None, dimensions_to_str: Callable[[Dict[str, List[str]], bool], str] = <function dimensions_to_str>, select_split_candidate: Callable[[Dict[str, List[str]]], str] = <function select_median_low>)¶ Process a resource by processing its parts.
process_part
can raise aSplitOneDimension
exception, meaning that the current part must be split on one dimension.
-
select_first_alphabetic
(candidates: Dict[str, List[str]]) → str¶ Select the first dimension in alphabetic order.
-
select_median_low
(candidates: Dict[str, List[str]]) → str¶ Select the dimension having the “median low” number of values.
To avoid both:
the one with the least values because it has a higher probability to return too many results
the one with the most values because it could lead to URL too long
-
split
(dimensions: Dict[str, List[str]], select_candidate: Callable[[Dict[str, List[str]]], str]) → Tuple[str, Dict[str, List[str]], Dict[str, List[str]]]¶ Split
dimensions
.Raise
ValueError
ifdimensions
are not splittable, i.e. all dimensions have one code.
-
split_one_dimension
(dimensions: Dict[str, List[str]], select_candidate: Callable[[Dict[str, List[str]]], str]) → Tuple[str, List[str], List[str]]¶ Choose a splittable dimension and split its codes in 2 sub-lists.
Candidates are dimensions having more than one value code.
Raise
ValueError
ifdimensions
are not splittable, i.e. all dimensions have one code.>>> split_one_dimension({}, select_median_low) Traceback (most recent call last): ... ValueError: No dimension defined, can't split >>> split_one_dimension({'FREQ': ['A']}, select_median_low) Traceback (most recent call last): ... ValueError: All dimensions have one value, can't split more >>> split_one_dimension({'FREQ': ['A'], 'COUNTRY': ['FR']}, select_median_low) Traceback (most recent call last): ... ValueError: All dimensions have one value, can't split more >>> split_one_dimension({'FREQ': ['A', 'Q']}, select_median_low) ('FREQ', ['A'], ['Q']) >>> split_one_dimension({'FREQ': ['A', 'Q'], 'COUNTRY': ['FR']}, select_median_low) ('FREQ', ['A'], ['Q']) >>> split_one_dimension({'FREQ': ['A', 'Q'], 'COUNTRY': ['FR', 'DE']}, ... select_median_low) ('COUNTRY', ['FR'], ['DE']) >>> split_one_dimension({'FREQ': ['A', 'Q'], 'COUNTRY': ['FR', 'DE', 'IT']}, ... select_median_low) ('FREQ', ['A'], ['Q'])
resources module¶
Functions and data types helping processing resources in DBnomics fetchers.
-
class
DbnomicsDatasetResource
(*, id: str, base_dir: pathlib.Path)¶ Bases:
dbnomics_fetcher_toolbox.resources.Resource
A resource representing a dataset converted to DBnomics data model.
-
base_dir
: pathlib.Path¶
-
create_context
()¶ Create the dataset target directory, following DBnomics data model.
-
delete
()¶ Delete the dataset target directory, following DBnomics data model.
-
property
target_dir
¶ Directory where the dataset will be written, following DBnomics data model.
The name of the directory is the resource
id
.
-
-
class
Resource
(*, id: str)¶ Bases:
pydantic.main.BaseModel
A resource to be processed by
process_resources()
.-
create_context
()¶ Create a context necessary to process the resource.
This method is called by
process_resources()
before callingprocess_resource
.Override it to do anything you need (e.g. creating a directory…).
-
delete
()¶ Delete a resource.
This method is called by
process_resources()
if any error occurred during the execution of theprocess_resource
callback.Override it to do anything you need (e.g. delete a directory…).
-
id
: str¶
-
-
async
process_resources
(resources: Sequence[dbnomics_fetcher_toolbox.resources.Resource], args: argparse.Namespace, process_resource: Callable[[dbnomics_fetcher_toolbox.resources.Resource], Awaitable[None]], on_event: Callable[[dbnomics_fetcher_toolbox.status.ResourceEvent], None] = None, events: Sequence[dbnomics_fetcher_toolbox.status.ResourceEvent] = None) → Dict[str, dbnomics_fetcher_toolbox.status.ResourceEvent]¶ Handle the common work of processing resources.
Iterate over
resources
:removing the excluded ones if the
--exclude
option is usedkeeping only some of them if the
--only
option is usedprocessing a limited number of resources if the
--limit
option is used
By default do not process resources that were already processed with
SUCCESS
orFAILURE
status. If the option--retry-failed
is used, retry resources with FAILURE status. If the option--force
is used, process all resources.For each resource, call
process_resource(resource)
, logging messages allowing to track the processing progress. If an exception is raised during the execution ofprocess_resource
:log the error and process the next resource, or re-raise if
--fail-fast
option is usedcall
resource.delete()
if--delete-on-error
option is used
sdmx_v2_0 module¶
Functions and classes defining SDMX v2.0 data model.
-
class
AttachmentLevel
(value)¶ Bases:
enum.Enum
Values of the
attachmentLevel
attribute of the <Attribute> SDMX element.-
DATASET
= 'Dataset'¶
-
OBSERVATION
= 'Observation'¶
-
SERIES
= 'Series'¶
-
-
class
Attribute
(*, codelist_id: str, concept_id: str, attachment_level: dbnomics_fetcher_toolbox.sdmx_v2_0.AttachmentLevel)¶ Bases:
pydantic.main.BaseModel
Represents an
<Attribute>
SDMX element.-
attachment_level
: dbnomics_fetcher_toolbox.sdmx_v2_0.AttachmentLevel¶
-
codelist_id
: str¶
-
concept_id
: str¶
-
-
class
Code
(*, value: str, descriptions: Dict[str, str], parent_code: str = None)¶ Bases:
pydantic.main.BaseModel
Represents a
<Code>
SDMX element.-
descriptions
: Dict[str, str]¶
-
parent_code
: Optional[str]¶
-
value
: str¶
-
-
class
CodeList
(*, id: str, names: Dict[str, str], codes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Code])¶ Bases:
pydantic.main.BaseModel
Represents a
<CodeList>
SDMX element.-
codes
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Code]¶
-
id
: str¶
-
names
: Dict[str, str]¶
-
-
class
Concept
(*, id: str, names: Dict[str, str])¶ Bases:
pydantic.main.BaseModel
Represents a
<Concept>
SDMX element.-
id
: str¶
-
names
: Dict[str, str]¶
-
-
class
Dataset
(*, series: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Series])¶ Bases:
pydantic.main.BaseModel
Represents a
<Dataset>
SDMX element.-
series
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Series]¶
-
-
class
DatasetStructure
(*, id: str, names: Dict[str, str], codelists: List[dbnomics_fetcher_toolbox.sdmx_v2_0.CodeList], concepts: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Concept], dimensions: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Dimension], attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Attribute])¶ Bases:
pydantic.main.BaseModel
Represents a
<Structure>
SDMX element as used to describe one dataset.The
<Components>
SDMX element is flatten: its children<Dimension>
and<Attribute>
are directly available underdimensions
andattributes
.-
attributes
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Attribute]¶
-
codelists
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.CodeList]¶
-
concepts
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Concept]¶
-
classmethod
dimension_codelists_exist
(values)¶
-
dimensions
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Dimension]¶
-
get_codelist
(codelist_id: str) → dbnomics_fetcher_toolbox.sdmx_v2_0.CodeList¶ Return the codelist correponsing to
codelist_id
.
-
get_concept
(concept_id: str) → Optional[dbnomics_fetcher_toolbox.sdmx_v2_0.Concept]¶ Return the concept correponsing to
concept_id
.
-
get_dimension
(concept_id: str) → Optional[dbnomics_fetcher_toolbox.sdmx_v2_0.Dimension]¶ Return the dimension correponsing to
concept_id
.
-
id
: str¶
-
names
: Dict[str, str]¶
-
-
class
Dimension
(*, codelist_id: str, concept_id: str)¶ Bases:
pydantic.main.BaseModel
Represents a
<Dimension>
SDMX element.-
codelist_id
: str¶
-
concept_id
: str¶
-
-
class
KeyFamily
(*, id: str, names: Dict[str, str])¶ Bases:
pydantic.main.BaseModel
Represents a
<KeyFamily>
SDMX element.-
id
: str¶
-
names
: Dict[str, str]¶
-
-
class
Obs
(*, value: Union[typing_extensions.Literal[NaN], float], time: str = None, attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value])¶ Bases:
pydantic.main.BaseModel
Represents an
<Obs>
SDMX element.-
attributes
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]¶
-
find_attribute_value
(concept_id: str) → Optional[str]¶ Find the value of the attribute identified by
concept_id
.
-
time
: Optional[str]¶
-
value
: Union[typing_extensions.Literal[NaN], float]¶
-
-
class
Series
(*, key: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value], attributes: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value], observations: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Obs])¶ Bases:
pydantic.main.BaseModel
Represents a
<Series>
SDMX element.-
attributes
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]¶
-
find_attribute_value
(concept_id: str) → Optional[str]¶ Find the value of the attribute identified by
concept_id
.
-
find_key_value
(concept_id: str) → Optional[str]¶ Find the value of the key identified by
concept_id
.
-
key
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]¶
-
property
key_str
¶ Return
Series.key
as astr
.For each
Value
item of thekey
list, takeValue.value
, and join them all by a"."
.>>> series = Series(key=[ ... Value(concept_id='FREQ', value='A'), ... Value(concept_id='COUNTRY', value='FR'), ... ], attributes=[], observations=[]) >>> series.key_str 'A.FR'
-
observations
: List[dbnomics_fetcher_toolbox.sdmx_v2_0.Obs]¶
-
-
class
Value
(*, concept_id: str, value: str)¶ Bases:
pydantic.main.BaseModel
Represents a
<Value>
SDMX element.-
concept_id
: str¶
-
value
: str¶
-
-
build_dimension_mask
(structure: dbnomics_fetcher_toolbox.sdmx_v2_0.DatasetStructure, dimensions: Dict[str, List[str]]) → str¶ Build a dimension mask.
Return a string representing a selection of dimensions, as often used in SDMX APIs. This is useful to search series by dimension.
Raise
ValueError
if a dimension ofdimensions
can’t be found.
-
find_value_value
(concept_id: str, values: Sequence[dbnomics_fetcher_toolbox.sdmx_v2_0.Value]) → Optional[str]¶ Find the value of the items in
values
identified byconcept_id
.
-
get_one_name
(names: Dict[str, str], lang_candidates: Sequence[str] = None) → Optional[str]¶ Return a name among
names
.lang_candidates
can be used to choose a preferred language. Default value isNone
, which means that the first available name will be returned.
-
iter_keyfamilies
(structure_element: lxml.etree.Element) → Iterator[dbnomics_fetcher_toolbox.sdmx_v2_0.KeyFamily]¶ Yield
KeyFamily
objects found instructure_element
.
-
keyfamily_to_dataset_references
(keyfamily: dbnomics_fetcher_toolbox.sdmx_v2_0.KeyFamily, lang_candidates: Sequence[str] = None) → dbnomics_fetcher_toolbox.data_model.DatasetReference¶ Convert a SDMX 2.0 KeyFamily into a DBnomics dataset reference.
Return a
dbnomics_fetcher_toolbox.data_model.DatasetReference
built fromkeyfamily
.Use
lang_candidates
to choose a preferred language. It is forwarded toget_one_name()
.
-
load_dataset
(dataset_element: lxml.etree.Element) → dbnomics_fetcher_toolbox.sdmx_v2_0.Dataset¶ Return a
Dataset
built from the given XML element.
-
load_dataset_structure
(structure_element: lxml.etree.Element) → dbnomics_fetcher_toolbox.sdmx_v2_0.DatasetStructure¶ Return a
DatasetStructure
built from given XML element.
-
parse_observation_value
(value: str) → Union[typing_extensions.Literal[NaN], float]¶ Parse
str
and return afloat
or the literal string"NaN"
.If
value
can’t be converted to afloat
and is different from"NaN"
, raise aValueError
.>>> parse_observation_value(NAN) 'NaN' >>> parse_observation_value(1.2) 1.2 >>> parse_observation_value('Hello') Traceback (most recent call last): ... ValueError: Invalid value 'Hello' for a SDMX observation
-
remove_prepared_date
(element: lxml.etree.Element) → lxml.etree.Element¶ Remove
prepared date
from XML element.This is sometimes useful to avoid triggering a false commit in source data.
Mutate
element
and return it to ease using that function as a callback, for example withdbnomics_fetcher_toolbox.formats.fetch_or_read_xml()
.
-
series_to_series_json
(series: dbnomics_fetcher_toolbox.sdmx_v2_0.Series) → dict¶ Return a
dict
representing a series, following DBnomics data model.
-
structure_to_dataset_json
(dataset_code: str, structure: dbnomics_fetcher_toolbox.sdmx_v2_0.DatasetStructure, lang_candidates: Sequence[str] = None, all_series: Sequence[dbnomics_fetcher_toolbox.sdmx_v2_0.Series] = None) → dict¶ Return a
dict
representing a dataset, following DBnomics data model.Use
lang_candidates
to choose a preferred language. It is forwarded toget_one_name()
.Use
all_series
to write only the dimensions and the attributes actually used by the series.
status module¶
Types and functions allowing to handle status file.
-
class
BaseEvent
(*, type: dbnomics_fetcher_toolbox.status.EventType, id: str, emitted_at: datetime.datetime = None, duration: float, message: str = None)¶ Bases:
pydantic.main.BaseModel
A base class for structured events.
-
duration
: float¶
-
emitted_at
: datetime.datetime¶
-
id
: str¶
-
message
: Optional[str]¶
-
classmethod
set_emitted_at_now
(v)¶
-
to_json_data
()¶ Return data as it would be encoded to JSON.
-
-
class
EventType
(value)¶ Bases:
enum.Enum
The resulting state of processing a resource.
-
RESOURCE
= 'RESOURCE'¶
-
RESOURCE_PART
= 'RESOURCE_PART'¶
-
-
class
PartEvent
(*, type: dbnomics_fetcher_toolbox.status.EventType = <EventType.RESOURCE_PART: 'RESOURCE_PART'>, id: str, emitted_at: datetime.datetime = None, duration: float, message: str = None, resource_id: str, status: dbnomics_fetcher_toolbox.status.PartStatus, series_count: int = None, split_dimension: str = None, split_parts: Tuple[str, str] = None)¶ Bases:
dbnomics_fetcher_toolbox.status.BaseEvent
Information gathered during the processing of a resource part.
-
resource_id
: str¶
-
series_count
: Optional[int]¶
-
split_dimension
: Optional[str]¶
-
split_parts
: Optional[Tuple[str, str]]¶
-
-
class
PartStatus
(value)¶ Bases:
enum.Enum
The resulting state of processing a resource part.
-
FAILURE
= 'FAILURE'¶
-
SPLIT
= 'SPLIT'¶
-
SUCCESS
= 'SUCCESS'¶
-
-
class
ResourceEvent
(*, type: dbnomics_fetcher_toolbox.status.EventType = <EventType.RESOURCE: 'RESOURCE'>, id: str, emitted_at: datetime.datetime = None, duration: float, message: str = None, status: dbnomics_fetcher_toolbox.status.ResourceStatus)¶ Bases:
dbnomics_fetcher_toolbox.status.BaseEvent
Information gathered during the processing of a resource.
-
class
ResourceStatus
(value)¶ Bases:
enum.Enum
The resulting state of processing a resource.
-
FAILURE
= 'FAILURE'¶
-
SKIPPED
= 'SKIPPED'¶
-
SUCCESS
= 'SUCCESS'¶
-
-
dedupe_events
(events: Iterable[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]) → List[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]¶ Yield events in chronological order, deduped by
event.id
.Because the status file is an activity log, it can contain multiple items having the same event id. This function dedupes events by
id
by keeping the latest ones in chronological order.
-
iter_events
(file: pathlib.Path) → Iterator[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]¶ Yield events from
file
, ignoring events without or with invalidtype
.
-
load_events
(target_dir: pathlib.Path, dedupe: bool = True) → Optional[List[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]]¶ Load events from
status.jsonl
expected to be found intarget_dir
.If
dedupe==True
(default), the events are deduped byid
, keeping only the latest one in chronological order. Otherwise all the events are returned.
-
load_events_from_file
(file: pathlib.Path, dedupe: bool = True) → List[Union[dbnomics_fetcher_toolbox.status.ResourceEvent, dbnomics_fetcher_toolbox.status.PartEvent]]¶ Load events from
file
.If
dedupe==True
(default), the events are deduped byid
, keeping only the latest one in chronological order. Otherwise all the events are returned.
-
open_status_writer
(args: argparse.Namespace) → Iterator[Callable[[dbnomics_fetcher_toolbox.status.BaseEvent], None]]¶ Open a writer to create a
status.jsonl
file and fill it with events.Use it as a context manager.
If
--flush-status
option was given, flush the file after appending each event.Example:
with status.open_status_writer(args) as append_event: await process_resources( resources=resources, args=args, process_resource=process_resource, on_event=append_event, events=events, )
utils module¶
Utility functions.
-
find
(predicate: Callable[[T], bool], items: Iterable[T], default=None) → Optional[T]¶ Find the first item in
items
satisfyingpredicate(item)
.Return the found item, or return
default
if no item was found.>>> find(lambda item: item > 2, [1, 2, 3, 4]) 3 >>> find(lambda item: item > 10, [1, 2, 3, 4]) >>> find(lambda item: item > 10, [1, 2, 3, 4], default=42) 42
-
is_empty
(value: Any) → bool¶ Return
True
ifvalue
is empty.Empty values are
[]
,{}
,None
,""
, but notFalse
,0
.>>> is_empty(0) False >>> is_empty(1) False >>> is_empty([]) True >>> is_empty([1]) False >>> is_empty({}) True >>> is_empty({'a': 1}) False >>> is_empty('') True >>> is_empty('hi') False >>> is_empty(set()) True >>> is_empty({1}) False >>> is_empty(None) True
-
without_empty_values
(mapping: Mapping[K, V]) → Dict[K, V]¶ Return a
dict
built frommapping
without its empty values.This function does not apply recursively.
Testing emptiness of values is done by
is_empty
.>>> without_empty_values( ... {'name': 'Robert', 'children': None, 'age': 42, ... 'nb_gold_medals': 0, 'hobbies': [], ... 'houses': [{'city': 'Dallas'}], ... 'notes': {'maths': 'A', 'tech': None}}) {'name': 'Robert', 'age': 42, 'nb_gold_medals': 0, 'houses': [{'city': 'Dallas'}], 'notes': {'maths': 'A', 'tech': None}}