APIs
Netconf Client API
send_nc_request
To send different netconf operations (get, edit-config, action and get-config) to BAA. Sample netconf requests can be found here
send_nc_request(operation, template_name, *args)
Args:
operation (str): netconf operation to be performed(get,get-config,edit-config, action).
template_name (xml): rpc reqest body
Returns:
retruns netconf response from OB-BAA server
create_subscription_and_take_notification
This creates the subscription to the provided notification stream and reads the notification from the stream.
create_subscription_and_take_notification(self)
Args:
stream (str): name of the stream on which we need to read the notification from(netconf, alarm, state-change)
Returns:
retruns netconf notification from given stream
Opensearch API
The API enabless to establish a client connection to open-search database and can perform below operations
__init__(self)
Initialize opensearch_client that can be used for the future operations.
add_data_to_index(self, os_index, message, id=None)
Adds the specified data to the specified index.
Circuit breaker protects add the specified data to the specified index.
Args:
os_index (str): The name of the index to add the data to.
message (dict): The data to add.
Returns:
dict: The response from Opensearch
Raises:
CircuitBeakerError : If a circuit breaker error has occurred when trying to add the specified data to the Opensearch.
add_index(self, os_index, index_body)
Adds the specified index into opensearch
Circuit breaker protects add the specified data to the specified index.
Args:
os_index (str): The name of the index
index_body (dict): index mapping and details
Returns:
dict: The response from Opensearch
Raises:
CircuitBeakerError : If a circuit breaker error has occurred when trying to add the specified data to the Opensearch.
check_if_document_exists_in_index(self, index, document_id)
Checks if the document exists in index
close_connection(self)
Disconnect to opensearch_handler server
delete_document_from_index(self, index, document_id)
Deletes a given document from the index.
get_indexes(self, indexes)
search(self, os_index, search_body)
Searches for data in the specified index using the provided search body.
Circuit breaker protect to get data form Opensearch.
Args:
os_index (str): The name of the index to search.
search_body (dict): The search body.
Returns:
dict: The search results.
Raises:
CircuitBeakerError : If a circuit breaker error has occurred when trying to get data from Opensearch.
update_data_to_index(self, os_index, message, doc_id)
updates the specified data into opensearch index
Circuit breaker protects add the specified data to the specified index.
Args:
os_index (str): The name of the index
index_body (dict): index mapping and details
Returns:
dict: The response from Opensearch
Raises:
CircuitBeakerError : If a circuit breaker error has occurred when trying to add the specified data to the
KAFKA API
KAFKA Producer
The module contains the following classes:
- Worker: A class that runs a thread with an asyncio event loop for asynchronous publishing of messages to Kafka.
- KafkaProducer: A class that allows you to publish messages to a Kafka topic using the aiokafka library.
classes
builtins.object
KafkaProducer
threading.Thread(builtins.object)
Worker
class KafkaProducer(builtins.object)
KafkaProducer(callback=None)
methods
__init__(self, callback=None)
A Kafka producer that sends messages to a specified Kafka server.
Attributes:
producer (kafka.KafkaProducer): A Kafka producer instance used to send messages.
wt (Worker): A worker thread that runs in the background and starts the producer.
callback (function): A callback function to invoke after successfully starting the producer
Raises:
Exception: If the Kafka server is not defined.
invoke_callback(self, value, f, *args, **kwargs)
This function invokes a callback function with the specified arguments.
Args:
value (Any): The value to pass as the first argument to the callback function.
f (callable): The callback function to invoke.
args (Any): Additional arguments to pass to the callback function.
kwargs (Any): Additional keyword arguments to pass to the callback function.
Returns:
None
publish_message(self, topic_name, value, success_call_back, error_call_back, key=None)
__init__(self, name=None)
Initializes a new instance of the class.
Args:
name (str, optional): A string representing the name of the instance. Defaults to None.
post(self, task, *args)
This function posts a coroutine function to the event loop.
Args:
task (coroutine function): A coroutine function to run in the event loop.
args (Any): Arguments to pass to the coroutine function.
Returns:
None
run(self) -> None
This function starts the event loop in a separate thread.
Returns:
None
KAFKA consumer
This module is responsible for creating a Kafka Consumer which can receive messages from a Kafka topic.
KafkaConsumer: Class which creates a Kafka Consumer and has functions to interact with it.
- __init__: Initializes the Kafka Consumer with a given group and topic.
- close: Closes the Kafka Consumer.
- get_topics: Returns the topics being consumed by the Kafka Consumer.
- start_consumer: Starts the Kafka Consumer.
- get_many: Returns many messages from the Kafka Consumer for a given partition.
- stop: Stops the Kafka Consumer.
- get_one: Returns a single message from the Kafka Consumer.
- to_committed: Seeks to the committed offset of the Kafka Consumer.
constants
KAFKA_SERVER: The server which hosts the Kafka broker.
KAFKA_SECURITY_PROTOCOL: The security protocol used to connect to Kafka
classes
builtins.object
KafkaConsumer
class KafkaConsumer(builtins.object)
KafkaConsumer(group, topic)
methods
__init__(self, group, topic)
Initialize the Kafka Consumer with a specified consumer group, topic and event loop.
Args:
group (str): The Kafka consumer group to which this consumer belongs.
topic (str): The Kafka topic from which messages will be consumed.
loop (asyncio.AbstractEventLoop): The asyncio event loop instance.
Raises:
Exception: If KAFKA_SERVER environment variable is not defined.
Returns:
None
close(self)
Stop the Kafka Consumer and close the connection to Kafka.
Returns:
None
get_many(self, partition)
Get messages from multiple partitions of the Kafka topic.
Args:
partition (Union[int, Tuple[int]]): The partition(s) from which to consume messages.
Returns:
Dict[TopicPartition, List[ConsumerRecord]]: A dictionary of messages, keyed by partition number.
get_one(self)
Get one message from the Kafka topic.
Returns:
ConsumerRecord: The message consumed from Kafka.
get_topics(self)
Get the list of topics that are available to this Kafka Consumer.
Returns:
list: List of available topics.
start_consumer(self)
Start the Kafka Consumer and begin consuming messages from Kafka.
Returns:
asyncio.Task: An asyncio.Task object representing the consumer task.
stop(self)
Stop the Kafka Consumer from consuming messages.
Returns:
asyncio.Task: An asyncio.Task object representing the consumer task.
to_committed(self)
Seek to the committed offsets for the assigned partitions.
Returns:
None
FluentD API
This API forwards the container logs into fluendD.
data
install(self, application_name, category_log_levels_dict=None):
Installs the Fluentd logger.
Args:
application_name (str): The name of the application.
category_log_levels_dict (dict, optional): Dictionary containing log levels for specific categories.
getLogFormat(self, application_name):
Get log format.
Args:
application_name (str): The name of the application.
Returns:
dict: The log format dictionary.
getLogger(self):
Get the logger object.
Returns:
logger: The logger object.
usage
if "fluentd" == os.getenv("LOG_OPT"):
FluentdLogger().install('app_name')
An overview of the applications developed and currently available that make use of one or multiple of the above APIs can be found here