Derivation Agent
FlaskConfig ¶
Bases: object
This class provides the configuration for flask app object. Each config should be provided as constant. For more information, visit https://flask.palletsprojects.com/en/3.0.x/config/.
Attributes:
Name | Type | Description |
---|---|---|
SCHEDULER_API_ENABLED |
bool
|
Enables the Flask Scheduler API (default is True) |
DerivationAgent ¶
DerivationAgent(time_interval: int, kg_url: str, kg_update_url: str = None, kg_user: str = None, kg_password: str = None, fs_url: str = None, fs_user: str = None, fs_password: str = None, derivation_instance_base_url: str = TWA_BASE_URL, flask_config: FlaskConfig = FlaskConfig(), agent_endpoint_base_url: str = 'http://localhost:5000/', register_agent: bool = True, max_thread_monitor_async_derivations: int = 1, email_recipient: str = '', email_subject_prefix: str = '', email_username: str = '', email_auth_json_path: str = '', email_start_end_async_derivations: bool = False, logger_for_dev: bool = True)
Bases: ABC
Abstract base class for a derivation agent.
This class provides the foundational methods and configurations required for a derivation agent. It uses a Flask application and APScheduler to manage asynchronous derivations and periodic jobs.
Attributes:
Name | Type | Description |
---|---|---|
app |
Flask
|
The Flask application instance. |
scheduler |
APScheduler
|
The APScheduler instance for managing periodic jobs. |
time_interval |
int
|
The time interval between two runs of the derivation monitoring job (in seconds). |
max_thread_monitor_async_derivations |
int
|
Maximum number of threads to be used for monitoring async derivations. |
syncDerivationEndpoint |
str
|
HTTP endpoint for handling synchronous derivations. |
kgUrl |
str
|
SPARQL query endpoint. |
kgUpdateUrl |
str
|
SPARQL update endpoint. |
kgUser |
str
|
Username for the SPARQL endpoints. |
kgPassword |
str
|
Password for the SPARQL endpoints. |
fs_url |
str
|
File server endpoint. |
fs_user |
str
|
Username for the file server. |
fs_password |
str
|
Password for the file server. |
derivation_client |
PyDerivationClient
|
Client for managing derivations. |
sparql_client |
PySparqlClient
|
SPARQL client instance. |
logger |
Logger
|
Logger for the agent. |
yag |
SMTP
|
Email client for sending notifications. |
email_recipient |
List[str]
|
List of email recipients. |
email_subject_prefix |
str
|
Prefix for email subjects. |
email_start_end_async_derivations |
bool
|
Flag to send email notifications at the start and end of async derivations. |
register_agent |
bool
|
Flag to register the agent to the knowledge graph. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_interval
|
int
|
time interval between two runs of derivation monitoring job (in SECONDS) |
required |
kg_url
|
str
|
SPARQL query endpoint, an example: "http://localhost:8080/blazegraph/namespace/triplestore/sparql" |
required |
kg_update_url
|
str
|
SPARQL update endpoint, will be set to the same value as kg_url if not provided, an example: "http://localhost:8080/blazegraph/namespace/triplestore/sparql" |
None
|
kg_user
|
str
|
username used to access the SPARQL query/update endpoint specified by kg_url/kg_update_url |
None
|
kg_password
|
str
|
password that set for the kg_user used to access the SPARQL query/update endpoint specified by kg_url/kg_update_url |
None
|
fs_url
|
str
|
file server endpoint, an example: "http://localhost:8080/FileServer/" |
None
|
fs_user
|
str
|
username used to access the file server endpoint specified by fs_url |
None
|
fs_password
|
str
|
password that set for the fs_user used to access the file server endpoint specified by fs_url |
None
|
derivation_instance_base_url
|
str
|
namespace to be used when creating derivation instance, an example: "http://example.com/kg/" |
TWA_BASE_URL
|
flask_config
|
FlaskConfig
|
configuration object for flask app, should be an instance of the class FlaskConfig provided as part of this package |
FlaskConfig()
|
agent_endpoint_base_url
|
str
|
data property OntoAgent:hasHttpUrl of OntoAgent:Operation of the derivation agent, an example: "http://localhost:5000/endpoint" |
'http://localhost:5000/'
|
register_agent
|
bool
|
boolean value, whether to register the agent to the knowledge graph |
True
|
max_thread_monitor_async_derivations
|
int
|
maximum number of threads that to be used for monitoring async derivations |
1
|
email_recipient
|
str
|
recipients of email notification seperated by semicolon, e.g. "abc@email.com;def@email.com" |
''
|
email_subject_prefix
|
str
|
subject prefix of the email title to put in a square bracket, e.g. the email subject will start with "[My Prefix]" when provided "My Prefix" |
''
|
email_username
|
str
|
the username to be used as the sender of the email |
''
|
email_auth_json_path
|
str
|
file path to the auth json for the |
''
|
email_start_end_async_derivations
|
bool
|
a boolean flag indicating whether to send email notification at the start and end of processing async derivations |
False
|
logger_for_dev
|
bool
|
logger for agents in development or production |
True
|
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
|
agent_input_concepts
abstractmethod
classmethod
property
¶
This method returns a list of input concepts of the agent. This should be overridden by the derived class.
agent_output_concepts
abstractmethod
classmethod
property
¶
This method returns a list of output concepts of the agent. This should be overridden by the derived class.
periodical_job ¶
This method is used to start a periodic job. This should be used as a decorator (@Derivation.periodical_job) for the method that needs to be executed periodically.
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
send_email_when_exception ¶
Decorator to send an email when an exception occurs in the decorated method.
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
send_email ¶
Sends an email notification with the given subject and contents.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
subject
|
str
|
The subject of the email |
required |
contents
|
list
|
The contents of the email |
required |
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
send_email_when_async_derivation_up_to_date ¶
send_email_when_async_derivation_up_to_date(derivation_iri: str)
Sends an email notification when an asynchronous derivation is up-to-date.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
derivation_iri
|
str
|
The IRI of the derivation that was processed |
required |
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
send_email_when_async_derivation_started ¶
Sends an email notification when an asynchronous derivation starts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
derivation_iri
|
str
|
The IRI of the derivation that was processed |
required |
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
get_sparql_client ¶
get_sparql_client(sparql_client_cls: Type[PY_SPARQL_CLIENT]) -> PY_SPARQL_CLIENT
Returns a SPARQL client object that instantiated from sparql_client_cls, which should extend PySparqlClient class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sparql_client_cls
|
Type[PY_SPARQL_CLIENT]
|
The SPARQL client class |
required |
Returns:
Name | Type | Description |
---|---|---|
PY_SPARQL_CLIENT |
PY_SPARQL_CLIENT
|
An instance of the SPARQL client |
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
register_agent_in_kg ¶
This method registers the agent to the knowledge graph by uploading its OntoAgent triples generated on-the-fly.
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
add_url_pattern ¶
add_url_pattern(url_pattern: str = None, url_pattern_name: str = None, function: RouteCallable = None, methods: str = ['GET'], *args, **kwargs)
This method is a wrapper of add_url_rule method of Flask object that adds customised URL Pattern to derivation agent. For more information, visit https://flask.palletsprojects.com/en/3.0.x/api/#flask.Flask.add_url_rule WARNING: Use of this by developer is STRONGLY discouraged. The design intention of an derivation agent is to communicate via the KNOWLEDGE GRAPH, and NOT via HTTP requests.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url_pattern
|
str
|
the endpoint url to associate with the rule and view function |
None
|
url_pattern_name
|
str
|
the name of the endpoint |
None
|
function
|
RouteCallable
|
the view function to associate with the endpoint |
None
|
methods
|
str
|
HTTP request methods, default to ['GET'] |
['GET']
|
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
monitor_async_derivations ¶
This method monitors the status of the asynchronous derivation that "isDerivedUsing" DerivationAgent.
When it detects the status is "Requested", the agent will mark the status as "InProgress" and start the job. Once the job is finished, the agent marks the status as "Finished" and attaches the new derived IRI to it via "hasNewDerivedIRI". All new generated triples are also written to the knowledge graph at this point.
When it detects the status is "InProgress", the currently implementation just passes.
When it detects the status is "Finished", the agent deletes the old entities,
reconnects the new instances (previously attached to the status via "hasNewDerivedIRI") with the original derivation,
cleans up all the status, and finally updates the timestamp of the derivation.
All these processing steps at the Finished
status are taken care of by method
uk.ac.cam.cares.jps.base.derivation.DerivationClient.cleanUpFinishedDerivationUpdate(String)
.
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 |
|
process_request_parameters
abstractmethod
¶
process_request_parameters(derivation_inputs: DerivationInputs, derivation_outputs: DerivationOutputs)
This method perform the agent logic of converting derivation inputs to derivation outputs. Developer shall override this when writing new derivation agent based on DerivationAgent class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
derivation_inputs
|
DerivationInputs
|
instance of derivation inputs, essentially in the format of: { "https://example.com/kg/Concept_1": ["https://example.com/kg/Concept_1/Instance_1"], "https://example.com/kg/Concept_2": ["https://example.com/kg/Concept_2/Instance_2"], "https://example.com/kg/Concept_3": ["https://example.com/kg/Concept_3/Instance_3_1", "https://example.com/kg/Concept_3/Instance_3_2"], "https://example.com/kg/Concept_4": ["https://example.com/kg/Concept_4/Instance_4"] } |
required |
derivation_outputs
|
DerivationOutputs
|
instance of derivation outputs, developer should add new created entiteis and triples to this variable |
required |
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
start_all_periodical_job ¶
This method starts all scheduled periodical jobs.
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
handle_sync_derivations ¶
This method handles synchronous derivation by using the Flask app object of the DerivationAgent to process the HTTP request,
and then pass it to process_request_parameters
function provided by the developers.
Source code in JPS_BASE_LIB/python_wrapper/twa/agent/derivation_agent.py
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 |
|
validate_inputs
abstractmethod
¶
validate_inputs(http_request) -> bool
Validates the HTTP request sent to the agent for processing synchronous derivations. Developer can overwrite this function for customised validation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
http_request
|
The HTTP request received |
required |
Raises:
Type | Description |
---|---|
Exception
|
HTTP request is empty |
Exception
|
IRI for derivation is not provided in the HTTP request |
Exception
|
IRI for agent is not provided in the HTTP request |
Exception
|
IRI for old derivation outputs are not provided in the HTTP request |
Exception
|
IRI for downstream derivations are not provided in the HTTP request |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
Whether the HTTP request is valid |