Package amps

Expand source code
import traceback
import os
import json
import uuid


class Util:
    """The `Util` class provides utility methods that may be useful during action and service execution.


    """
    def get_id():
        """Utility method that returns a unique ID in the format used by AMPS.
        """
        return uuid.UUID(str(uuid.uuid4())).hex

    @staticmethod
    def unravel_erlport_object(result):
        if isinstance(result, List):
            return [Util.unravel_erlport_object(x) for x in result]
        elif isinstance(result, Map):
            return {k.decode(): Util.unravel_erlport_object(v) for k, v in result.items()}
        elif isinstance(result, Atom):
            return result.decode()
        elif isinstance(result, bytes):
            return result.decode()
        else:
            return result


class Logger:
    """The `Logger` class from AMPS is a utility class for logging events back to AMPS. This class should not be used directly, but rather instances of this class within Actions, Endpoints, and Services should be used.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            self.logger.info("Action Execution Started")
            # Perform Action Logic Here
    ```
    """

    def __init__(self, sid="", service=None):
        self.__sid__ = sid
        self.__service__ = service

    def log(self, level: str, message: str):
        """Instance method that logs a given message with the given level.

        Args:
            level (string): The level to use when logging the given message. All valid "level" values are available in the docs for the [Logger](https://hexdocs.pm/logger/1.13/Logger.html#module-levels) Elixir library underlying this class.
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                self.logger.log("info", "Action Execution Started")
                # Perform Action Logic Here
        ```
        """
        if self.__service__:
            self.__service__.__log__(Atom(
                bytes(level, "utf-8")), message)
        else:
            call(Atom(b'Elixir.Amps.PyHandler'), Atom(b'log'), [Atom(
                bytes(level, "utf-8")), message, [(Atom(b'sid'), self.__sid__)]])

    def info(self, message: str):
        """Instance method that logs a given message with the "info" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                self.logger.info("Action Execution Started")
                # Perform Action Logic Here
        ```
        """
        self.log("info", message)

    def debug(self, message: str):
        """Instance method that logs a given message with the "debug" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                self.logger.debug("Received message")
        ```
        """
        self.log("debug", message)

    def warning(self, message: str):
        """Instance method that logs a given message with the "warning" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                self.logger.warning("Something minor went wrong.")
        ```
        """
        self.log("warning", message)

    def error(self, message: str):
        """Instance method that logs a given message with the "error" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                self.logger.error("Something major went wrong.")
        ```
        """
        self.log("error", message)


class DB:
    def __init__(self, env):
        self.env = env

    def find(self, collection, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def find_one(self, collection, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find_one'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def create(self, collection, body):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        body = Map(body)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'create'), [collection, body])
        return Util.unravel_erlport_object(result)

    def update(self, collection, body, id):
        coll = bytes(collection, "utf-8")
        id = bytes(id, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        body = Map(body)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'update'), [collection, body, id])
        return Util.unravel_erlport_object(result)

    def delete(self, collection, id):
        coll = bytes(collection, "utf-8")
        id = bytes(id, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'delete'), [collection, id])
        return Util.unravel_erlport_object(result)


class Users:
    def __init__(self, env):
        self.env = env

    def find(self, env, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def find_one(self, env, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find_one'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def create(self, user):
        user = Map(user)
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'create'), [user, self.env])
        return Util.unravel_erlport_object(result)

    def update(self, id, body):
        user = Map(body)
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'update'), [id, user, self.env])
        return Util.unravel_erlport_object(result)

    def delete(self, id):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'delete'), [id, self.env])
        return Util.unravel_erlport_object(result)

    def create_session(self, user):
        user = Map(user)
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'create_session'), [user, self.env])
        return Util.unravel_erlport_object(result)

    def authenticate(self, access_token):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'authenticate'), [access_token, self.env])
        return Util.unravel_erlport_object(result)

    def renew_session(self, renewal_token):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'renew_session'), [renewal_token, self.env])
        return Util.unravel_erlport_object(result)

    def delete_session(self, access_token):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'delete_session'), [access_token, self.env])
        return Util.unravel_erlport_object(result)


class Action:
    """The `Action` class from AMPS provides a base class for actions that must be extended in a custom action. Actions can be performed by overriding the `Action.action` callback exposed by the class.

    Attributes:
        msg (dict): The msg attribute contains a python of the dictionary and the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
        parms (dict): The parms attribute contains all the parameters of the configured action including any extra parameters you may have specified under the "parms" key.
        sysparms (dict): The sysparms attribute contains all useful system configuration parameters for use in actions. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
        extra (dict): The extra attribute contains all extra parameters configured in the action.
        provider (dict): If use_provider is true on the configured action, the provider attribute contains the provider parameters. The provider parameters are available under the parms  object as well under the "provider" key.
        logger (Logger): The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the action execution will appear in the corresponding session logs.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Get Message Data (This example assumes inline).
            data = self.msg["data"]
            # Perform Action Logic Here
            if success:
                self.logger.info("Successfully Processed Message")
                return Action.send_status("completed")
            else:
                self.logger.warn("Failed to process message")
                return Action.send_status("failed", "Reason for Failure")
    ```
    """

    def __init__(self, msgdata):
        msgdata = json.loads(msgdata)
        self.msg = msgdata["msg"]
        self.parms = msgdata["parms"]
        self.sysparms = msgdata["sysparms"]
        self.extra = self.parms["parms"]
        self.env = self.parms["env"]
        self.db = DB(self.env)
        self.users = Users(self.env)

        if self.parms["use_provider"]:
            self.provider = self.parms["provider"]
        if self.msg.get("sid"):
            self.logger = Logger(sid=self.msg["sid"])
        else:
            self.logger = Logger()

    def __run__(self):
        try:
            response = self.action()
        except Exception as e:
            response = {"error": True, "reason": traceback.format_exc()}
        return json.dumps(response)

    def action(self):
        """Callback to override in order to perform action logic in a class that extends `Action`.

        Relevant message data is available via the attributes available on the self object. It expects the method to return a python dictionary with at least a "status" key with the status of the action execution. If the action execution is unsuccessful, a "reason" key can also be returned along with an unsuccessful status. If a new message is intended to be created by this action, it is also expected that a "msg" key will be returned with a dictionary object containing either a "data" key with inline data or an "fpath" key containing the file path to the message. Similarly, actions used in Endpoints can return a "response" object with the response status code specified under the "code" key, and the response body provided via either the "data" or "fpath" key. The action method is automatically wrapped in a try-except block when it is called, so is unnecessary to wrap the overall method in a try-except block. If you wish to have more granular visibility over errors that arise in your actions, feel free to use try-except blocks at various steps throughout action execution to allow for logging specific reasons for failure. To simplify action creation and handling, a number of helper methods are exposed by the class as static methods.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        return {"status": "completed"}

    def get_data(self):
        """Convenience method to read get string data from message.

        Returns either the inline message data stored on the "data" key or the data stored in the message's file via the "fpath" key.
        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                data = self.get_data()
                # Use data here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        if self.msg.get("data"):
            return self.msg["data"]
        else:
            return open(self.msg["fpath"]).read()

    @staticmethod
    def send_async(status, key, data):
        """Static method for creating a dictionary with the provided status and an async key with the provided data for asynchronously returning from an API endpoint.

        Args:
            status (string): The Action status to log.
            key (string): The key to use for this data when returning the json object.
            data (string || dict): Either a string containing the response data or a dictionary which will be JSON encoded and return.

        This convenience method allows for returning data asynchronously from a script when used in an API Endpoint-triggered topic workflow.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_async("completed", "Hello, this is my async response.")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        return {"status": status, "async": {key: data}}

    @staticmethod
    def send_status(status: str, reason: str = None):
        """Static method for creating a dictionary with the provided status and optional reason for returning in the `action` callback.

        Args:
            status (string): The Action status to log.
            reason (string): An optional reason to provide along with the given status.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        if reason:
            return {"status": status, "reason": reason}
        else:
            return {"status": status}

    @staticmethod
    def send_file(status: str, fpath: str, meta: dict = {}):
        """Static method for creating a dictionary with the provided status and a new message from the provided fpath and additional metadata for returning in the `Action.action` callback.

        Args:
            status (string): The Action status to log.
            fpath (string): A filepath to a file containing the message data.
            meta (dict): An optional dictionary containing additional metadata to add to the new message.

        This convenience method allows for the creation of a new message using the file specified in `fpath` and any additional metadata supplied in `meta`. File Size (fsize) and File Name (fname) are automatically retrieved from the given file.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_file("completed", "path/to/file", {"partner": "companyX"})
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        fname = os.path.basename(fpath)
        fsize = os.path.getsize(fpath)
        msg = {**{"fname": fname, "fsize": fsize, "fpath": fpath}, **meta}
        return {"status": status, "msgs": [msg]}
    
    def send_files(status: str, files: list):
        """Static method for creating a dictionary with the provided status and new messages from the provided dicts with fpaths and additional metadata for returning in the `Action.action` callback.

        Args:
            status (string): The Action status to log.
            files (list): A list of dicts containing an fpath (string) with a path to the file and an optional meta (dict) containing additional metadata.

        This convenience method allows for the creation of new messages using the a list of files with specified `fpath`s and any additional metadata supplied in `meta`. File Size (fsize) and File Name (fname) are automatically retrieved from the given files.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_file("completed", [{"fpath": path/to/file", "meta": {"partner": "companyX"}}])
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        msgs = []
        for file in files:
            fname = os.path.basename(file["fpath"])
            fsize = os.path.getsize(file["fpath"])
            if file.get("meta"):
                meta = file["meta"]
            else:
                meta = {}
            msgs.append({**{"fname": fname, "fsize": fsize, "fpath": file["fpath"]}, **meta})
        return {"status": status, "msgs": msgs}


    @staticmethod
    def send_data(status: str, data: str, meta: dict = {}):
        """Static method for creating a dictionary with the provided status and a new message with the provided inline data and additional metadata for returning in the `action` callback.

        Args:
            status (string): The Action status to log.
            fpath (string): A string containing the message data.
            meta (dict): An optional dictionary containing additional metadata to add to the new message.

        This convenience method allows for the creation of a new message using the inline data specified in `data` and any additional metadata supplied in `meta`.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_data("completed", "Hello, this is my inline data.", {"partner": "companyX"})
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        msg = {**{"data": data}, **meta}
        return {"status": status, "msgs": [msg]}

    @staticmethod
    def send_error(reason: str):
        """Static method for handling errors by creating a dictionary with a ``failed`` status and the provided reason for returning in the `Action.action` callback.

        Args:
            reason (string): The reason for the action failure.

        This convenience method allows for the quick handling of error using the default ``failed`` status along with the reason specified in :param`reason`.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_error("Reason for Failure")
        ```
        """
        return {"status": "failed", "reason": reason}


class Endpoint(Action):
    """The `Endpoint` class from AMPS extends the `Action` class to provide additional convenience in setting up API endpoint actions. Like regular actions, endpoint actions can be performed by overriding the `Action.action` callback exposed by the class.

    Attributes:
        msg (dict): The msg attribute contains a python of the dictionary and the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
        parms (dict): The parms attribute contains all the parameters of the configured action including any extra parameters you may have specified under the "parms" key.
        sysparms (dict): The sysparms attribute contains all useful system configuration parameters for use in actions. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
        provider (dict): If use_provider is true on the configured action, the provider attribute contains the provider parameters. The provider parameters are available under the parms  object as well under the "provider" key.
        logger (Logger): The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the action execution will appear in the corresponding session logs.
        path_params (dict): The path_params attribute contains any parameters that were passed to the endpoint via the URL. It is also available on the `msg` attribute via the "path_params" key.
        query_params (dict): The query_params attribute contains any parameters that were passed to the endpoint via the query parameters. It is also available on the `msg` attribute via the "query_params" key.

    Unlike actions, which are typically performed asynchronous via topic workflows, because Endpoints are called synchronously, if the endpoint action executes successfully, but produces an erroneous state, the action should return a successful status with the appropriate error code and response body under the "response" key.

    Usage:
    ```
    from amps import Endpoint
    class my_endpoint(Endpoint):
        def action(self):
            # Perform Endpoint Logic Here
            if success:
                return Endpoint.send_resp_file("path/to/file", 200)
            else:
                return Endpoint.send_resp_data("File not found.", 404)
    ```
    """

    def __init__(self, msgdata):
        super().__init__(msgdata)
        self.path_params = self.msg.get("path_params")
        self.query_params = self.msg.get("query_params")

    def send_resp_data(data: str, code: int):
        """Static method for creating a dictionary with the provided inline data and status code in the "response" object for returning in the `Action.action` callback.

        Args:
            data (string): The inline data to supply in the response body.
            code (integer): An integer containing the response status code to set.

        Usage:
        ```
        from amps import Endpoint
        import json
        class my_endpoint(Endpoint):
            def action(self):
                # Perform Endpoint Logic Here
                if success:
                    return Endpoint.send_resp_data(json.dumps({"id": 1, "name": "Test User"}), 200)
                else:
                    return Endpoint.send_resp_data("User not found.", 404)
        ```
        """
        return {"status": "completed", "response": {"data": data, "code": code}}

    def send_resp_file(fpath, code):
        """Static method for creating a dictionary with the provided file path and status code in the "response" object for returning in the `Action.action` callback.

        Args:
            fpath (string): The path to file to send in the response body.
            code (integer): An integer containing the response status code to set.

        This convenience method allows for the convenient sending of a file in the response of a request along with the specified status code.

        Usage:
        ```
        from amps import Endpoint
        class my_endpoint(Endpoint):
            def action(self):
                # Perform Endpoint Logic Here
                if success:
                    return Endpoint.send_resp_file("path/to/file", 200)
                else:
                    return Endpoint.send_resp_data("File not found.", 404)
        ```
        """
        return {"status": "completed", "response": {"fpath": fpath, "code": code}}


responses = {}


class Service:
    """The `Service` class from AMPS provides a base class for custom python services that can be managed by AMPS, and act as both consumers of messages and producers of new messages.

    Attributes:
        parms (dict): The parms attribute contains all the parameters of the configured service.
        sysparms (dict): The sysparms attribute contains all useful system configuration parameters for use in services. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
        config (dict): The config attribute contains all the custom configuration provided when creating the service. All config is also available in the parms attribute under the "config" key.
        logger (Logger): The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the message handling or creation will appear in the corresponding session logs.
        env (string): The name of the AMPS environment in which this service is running.

    The `Service` class provides even more flexibility than the `Action` class in extending the functionality of AMPS. Unlike actions which are run and stopped, services can manage processes such as consumers/subscribers, web servers and sockets, etc, allowing the service to consume messages from and produce messages to AMPS.

    The class exposes two callback methods, `Service.initialize` and `Service.handle_message`.
    - `initialize` can be used to perform any initialization actions and start any subprocesses. Note that any subprocesses should be started in a separate thread in order to not block the main thread.
    - `handle_message` can be used to receive messages from the topic specified in the service configuration.

    The class additionally contains two methods for creating messages, `Service.send_message` and `Service.send_new`.
    `Service.send_message` should generally be used in handle_message in order to indicate that the message being created is stemming from the received message.
    Conversely, `Service.send_new` should be used to create new messages originating from an external source, such as a web server or consumer.

    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def initialize(self):
            # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
            # Perform any other startup logic.

        def handle_message(self, msg, logger):
            # Maybe deliver this message to my subprocess or use it to process/transform the message.
            # Send a new message stemming from this message.
            self.send_message(
                msg, {"data": "New Message Data Here", "my_custom": "metadata"})


    ```
    """

    def __init__(self, parms, sysparms, pid, env, lhandler):
        self.parms = json.loads(parms)
        self.sysparms = json.loads(sysparms)
        self.config = self.parms["config"]
        self.env = env
        self.logger = Logger(service=self)
        self.__pid__ = pid
        self.__lhandler__ = lhandler
        self.initialize()

    def __receive__(self, data):
        try:
            msg = json.loads(data)
            logger = Logger(sid=msg["sid"])

            logger.info(
                f'Message received by Custom Service {self.parms["name"]}')
            resp = self.handle_message(msg, logger)
            return (Atom(b'ok'), resp)
        except Exception as e:
            return (Atom(b'error'), str(e))

    def __response__(self, resp, id):
        global responses
        responses[id.encode("ascii")] = resp
        # f = open(id, "w").write(resp)

    def __await_response__(self, id):
        global responses
        resp = responses.get(id)
        while not resp:
            resp = responses.get(id)
        return responses.pop(id)

    def __send__(self, msg):
        cast(self.__pid__, msg)

    def __send_and_receive__(self, msg):
        id = Util.get_id().encode("ascii")
        global responses
        cast(self.__pid__, (msg, id))
        resp = self.__await_response__(id)
        return json.loads(resp)

    def __log__(self, level, msg):
        cast(self.__lhandler__, (Atom(b'log'), (level, msg)))

    def initialize(self):
        """Instance method for performing any initialization logic and starting any subprocesses.  
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.
        ```
        """
        pass

    def send_message(self, msg: dict, newmsg: dict):
        """Instance method for sending messages transformed by the `handle_message` callback. 
        The method accepts the original message which should be provided as is, as well as the new message with any new metadata to merge with the original message. If a "data" or "fpath" value is provided, it will overwrite the inline "data" or "fpath" on the current message. New message IDs are automatically generated and added by and retuned from this method for convenience.



        Args:
            msg (dict): The original message currently being processed in `handle_message`.
            newmsg (dict): A dictionary containing any new metadata to overwrite in the outgoing message.
        Returns:
            msgid: Message ID of the newly sent message.


        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.

            def handle_message(self, msg, logger):
                # Maybe deliver this message to my subprocess or use it to process/transform the message.
                # Send a new message stemming from this message.
                self.send_message(msg, {"data": "New Message Data Here", "my_custom": "metadata"})
        ```
        """
        msgid = Util.get_id()
        newmsg['parent'] = msg['msgid']
        newmsg['msgid'] = msgid
        if "data" in newmsg:
            del msg["fpath"]
            newmsg["fsize"] = len(newmsg["data"])
        elif "fpath" in newmsg:
            del msg["data"]
            newmsg["fsize"] = os.path.getsize(newmsg["fpath"])
        call(Atom(b'Elixir.Amps.PyHandler'), Atom(b'send_message'),
             [json.dumps({**msg, **newmsg}), json.dumps(self.parms), self.env])
        return msgid

    def send_new(self, newmsg: dict, action: str, response=False, timeout=15000):
        """Instance method for sending new messages generated from an external source. 

        Args:
            newmsg (dict): A dictionary containing the new outgoing message to send. 
            action (str): The output map action to use when sending this message.
            response (boolean): Whether to wait for and receive all the outputs or results of the processing of new message.
            timeout (int): When response is True, this indicates how long to wait for a response from message processing. 

        Returns:
            response: If response is True, will return result of message processing, otherwise will return message ID of the newly sent message.

        The method accepts a new message with any additional metadata. If a "data" of "fpath" is provided, associated metadata is also generated. In order to get a result from the topic-based message processing of the new message, the response parameter can be passed as True.

        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.

            def my_custom_function(self):
                # Any additional method in which are you generating a new message using an external source.
                self.send_new({"data": "New Message Data Here", "my_custom": "metadata"}, "myaction_name")
        ```
        """
        msgid = Util.get_id()
        newmsg['msgid'] = msgid
        if "data" in newmsg:
            newmsg["fsize"] = len(newmsg["data"])
        else:
            newmsg["fsize"] = os.path.getsize(newmsg["fpath"])
        resp = self.__send_and_receive__((Atom(
            b'new'), json.dumps(newmsg), action, response, timeout))
        return resp

    def create_session(self, user={}):
        """Instance method for authenticating and creating a session for an AMPS user.  

        Args:
            user (dict): A dictionary with a username and password key to use for authentication.

        Returns:
            success: A boolean indicating whether the operation was succesful.
            response: Either an error message when the operation fails or a dictionary with the following keys:
                access_token (str): The access token to use to authenticate the user.
                renewal_token (str): The renewal token when renewing the user's session. 
                user (dict): A dictionary containing all the user details. 
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def my_custom_login(self, user):
                response = self.create_session(user)
                if response["success"]:
                    # Handle Success
                else:
                    # Handle Failure

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'authenticate'), json.dumps(user)))
        return resp

    def renew_session(self, renewal_token: str):
        """Instance method for renewing an AMPS user's session.

        Args:
            renewal_token (str): A string containing the renewal token received when creating or renewing the session.

        Returns:
            success: A boolean indicating whether the operation was succesful.
            response: Either an error message when the operation fails or a dictionary with the following keys:
                access_token (str): The access token to use to authenticate the user.
                renewal_token (str): The renewal token when renewing the user's session. 
                user (dict): A dictionary containing all the user details. 
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def my_renewal(self, renewal_token):
                response = self.renew_session(renewal_token)
                if response["success"]:
                    # Handle Success
                else:
                    # Handle Failure

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'renew'), renewal_token))
        return resp

    def verify(self, access_token: str):
        """Instance method for verifying an AMPS user's session. 

        Args:
            access_token (str): A string containing the current access token for the user's session

        Returns:
            user: Either a dictionary with the user data on successful verification or False for failed verification. 
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def post(self, *args):
                access_token = self.request.headers.get('Authorization')
                verified = self.verify(access_token)
                if verified:
                    # Perform Logic Here
                else:
                    # Send Error
                    self.send_error(401)

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'fetch'), access_token))
        return resp

    def delete_session(self, access_token: str):
        """Instance method for deleting an AMPS user's session. 

        Args:
            access_token (str): A string containing the current access token for the user's session

        Returns:
            success: A boolean indicating whether the operation was succesful.

        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def delete(self, *args):
                access_token = self.request.headers.get('Authorization')
                resp = self.delete_session(access_token)

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'delete'), access_token))
        return resp

    def handle_message(self, msg: dict, logger: Logger):
        """Callback method for receiving messages on the configured topic. 

        Args:
            msg (dict): The msg args contains a python dictionary of the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data. 
            logger (Logger): A logger corresponding to the currently received message. Logging event using this object will accordingly render them in the corresponding message event sessions.
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.
        ```
        """
        return "completed"

Classes

class Action (msgdata)

The Action class from AMPS provides a base class for actions that must be extended in a custom action. Actions can be performed by overriding the Action.action() callback exposed by the class.

Attributes

msg : dict
The msg attribute contains a python of the dictionary and the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
parms : dict
The parms attribute contains all the parameters of the configured action including any extra parameters you may have specified under the "parms" key.
sysparms : dict
The sysparms attribute contains all useful system configuration parameters for use in actions. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
extra : dict
The extra attribute contains all extra parameters configured in the action.
provider : dict
If use_provider is true on the configured action, the provider attribute contains the provider parameters. The provider parameters are available under the parms object as well under the "provider" key.
logger : Logger
The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the action execution will appear in the corresponding session logs.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Get Message Data (This example assumes inline).
        data = self.msg["data"]
        # Perform Action Logic Here
        if success:
            self.logger.info("Successfully Processed Message")
            return Action.send_status("completed")
        else:
            self.logger.warn("Failed to process message")
            return Action.send_status("failed", "Reason for Failure")
Expand source code
class Action:
    """The `Action` class from AMPS provides a base class for actions that must be extended in a custom action. Actions can be performed by overriding the `Action.action` callback exposed by the class.

    Attributes:
        msg (dict): The msg attribute contains a python of the dictionary and the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
        parms (dict): The parms attribute contains all the parameters of the configured action including any extra parameters you may have specified under the "parms" key.
        sysparms (dict): The sysparms attribute contains all useful system configuration parameters for use in actions. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
        extra (dict): The extra attribute contains all extra parameters configured in the action.
        provider (dict): If use_provider is true on the configured action, the provider attribute contains the provider parameters. The provider parameters are available under the parms  object as well under the "provider" key.
        logger (Logger): The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the action execution will appear in the corresponding session logs.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Get Message Data (This example assumes inline).
            data = self.msg["data"]
            # Perform Action Logic Here
            if success:
                self.logger.info("Successfully Processed Message")
                return Action.send_status("completed")
            else:
                self.logger.warn("Failed to process message")
                return Action.send_status("failed", "Reason for Failure")
    ```
    """

    def __init__(self, msgdata):
        msgdata = json.loads(msgdata)
        self.msg = msgdata["msg"]
        self.parms = msgdata["parms"]
        self.sysparms = msgdata["sysparms"]
        self.extra = self.parms["parms"]
        self.env = self.parms["env"]
        self.db = DB(self.env)
        self.users = Users(self.env)

        if self.parms["use_provider"]:
            self.provider = self.parms["provider"]
        if self.msg.get("sid"):
            self.logger = Logger(sid=self.msg["sid"])
        else:
            self.logger = Logger()

    def __run__(self):
        try:
            response = self.action()
        except Exception as e:
            response = {"error": True, "reason": traceback.format_exc()}
        return json.dumps(response)

    def action(self):
        """Callback to override in order to perform action logic in a class that extends `Action`.

        Relevant message data is available via the attributes available on the self object. It expects the method to return a python dictionary with at least a "status" key with the status of the action execution. If the action execution is unsuccessful, a "reason" key can also be returned along with an unsuccessful status. If a new message is intended to be created by this action, it is also expected that a "msg" key will be returned with a dictionary object containing either a "data" key with inline data or an "fpath" key containing the file path to the message. Similarly, actions used in Endpoints can return a "response" object with the response status code specified under the "code" key, and the response body provided via either the "data" or "fpath" key. The action method is automatically wrapped in a try-except block when it is called, so is unnecessary to wrap the overall method in a try-except block. If you wish to have more granular visibility over errors that arise in your actions, feel free to use try-except blocks at various steps throughout action execution to allow for logging specific reasons for failure. To simplify action creation and handling, a number of helper methods are exposed by the class as static methods.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        return {"status": "completed"}

    def get_data(self):
        """Convenience method to read get string data from message.

        Returns either the inline message data stored on the "data" key or the data stored in the message's file via the "fpath" key.
        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                data = self.get_data()
                # Use data here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        if self.msg.get("data"):
            return self.msg["data"]
        else:
            return open(self.msg["fpath"]).read()

    @staticmethod
    def send_async(status, key, data):
        """Static method for creating a dictionary with the provided status and an async key with the provided data for asynchronously returning from an API endpoint.

        Args:
            status (string): The Action status to log.
            key (string): The key to use for this data when returning the json object.
            data (string || dict): Either a string containing the response data or a dictionary which will be JSON encoded and return.

        This convenience method allows for returning data asynchronously from a script when used in an API Endpoint-triggered topic workflow.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_async("completed", "Hello, this is my async response.")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        return {"status": status, "async": {key: data}}

    @staticmethod
    def send_status(status: str, reason: str = None):
        """Static method for creating a dictionary with the provided status and optional reason for returning in the `action` callback.

        Args:
            status (string): The Action status to log.
            reason (string): An optional reason to provide along with the given status.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        if reason:
            return {"status": status, "reason": reason}
        else:
            return {"status": status}

    @staticmethod
    def send_file(status: str, fpath: str, meta: dict = {}):
        """Static method for creating a dictionary with the provided status and a new message from the provided fpath and additional metadata for returning in the `Action.action` callback.

        Args:
            status (string): The Action status to log.
            fpath (string): A filepath to a file containing the message data.
            meta (dict): An optional dictionary containing additional metadata to add to the new message.

        This convenience method allows for the creation of a new message using the file specified in `fpath` and any additional metadata supplied in `meta`. File Size (fsize) and File Name (fname) are automatically retrieved from the given file.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_file("completed", "path/to/file", {"partner": "companyX"})
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        fname = os.path.basename(fpath)
        fsize = os.path.getsize(fpath)
        msg = {**{"fname": fname, "fsize": fsize, "fpath": fpath}, **meta}
        return {"status": status, "msgs": [msg]}
    
    def send_files(status: str, files: list):
        """Static method for creating a dictionary with the provided status and new messages from the provided dicts with fpaths and additional metadata for returning in the `Action.action` callback.

        Args:
            status (string): The Action status to log.
            files (list): A list of dicts containing an fpath (string) with a path to the file and an optional meta (dict) containing additional metadata.

        This convenience method allows for the creation of new messages using the a list of files with specified `fpath`s and any additional metadata supplied in `meta`. File Size (fsize) and File Name (fname) are automatically retrieved from the given files.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_file("completed", [{"fpath": path/to/file", "meta": {"partner": "companyX"}}])
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        msgs = []
        for file in files:
            fname = os.path.basename(file["fpath"])
            fsize = os.path.getsize(file["fpath"])
            if file.get("meta"):
                meta = file["meta"]
            else:
                meta = {}
            msgs.append({**{"fname": fname, "fsize": fsize, "fpath": file["fpath"]}, **meta})
        return {"status": status, "msgs": msgs}


    @staticmethod
    def send_data(status: str, data: str, meta: dict = {}):
        """Static method for creating a dictionary with the provided status and a new message with the provided inline data and additional metadata for returning in the `action` callback.

        Args:
            status (string): The Action status to log.
            fpath (string): A string containing the message data.
            meta (dict): An optional dictionary containing additional metadata to add to the new message.

        This convenience method allows for the creation of a new message using the inline data specified in `data` and any additional metadata supplied in `meta`.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_data("completed", "Hello, this is my inline data.", {"partner": "companyX"})
                else:
                    return Action.send_status("failed", "Reason for Failure")
        ```
        """
        msg = {**{"data": data}, **meta}
        return {"status": status, "msgs": [msg]}

    @staticmethod
    def send_error(reason: str):
        """Static method for handling errors by creating a dictionary with a ``failed`` status and the provided reason for returning in the `Action.action` callback.

        Args:
            reason (string): The reason for the action failure.

        This convenience method allows for the quick handling of error using the default ``failed`` status along with the reason specified in :param`reason`.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                if success:
                    return Action.send_status("completed")
                else:
                    return Action.send_error("Reason for Failure")
        ```
        """
        return {"status": "failed", "reason": reason}

Subclasses

Static methods

def send_async(status, key, data)

Static method for creating a dictionary with the provided status and an async key with the provided data for asynchronously returning from an API endpoint.

Args

status : string
The Action status to log.
key : string
The key to use for this data when returning the json object.

data (string || dict): Either a string containing the response data or a dictionary which will be JSON encoded and return. This convenience method allows for returning data asynchronously from a script when used in an API Endpoint-triggered topic workflow.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_async("completed", "Hello, this is my async response.")
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
@staticmethod
def send_async(status, key, data):
    """Static method for creating a dictionary with the provided status and an async key with the provided data for asynchronously returning from an API endpoint.

    Args:
        status (string): The Action status to log.
        key (string): The key to use for this data when returning the json object.
        data (string || dict): Either a string containing the response data or a dictionary which will be JSON encoded and return.

    This convenience method allows for returning data asynchronously from a script when used in an API Endpoint-triggered topic workflow.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_async("completed", "Hello, this is my async response.")
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    return {"status": status, "async": {key: data}}
def send_data(status: str, data: str, meta: dict = {})

Static method for creating a dictionary with the provided status and a new message with the provided inline data and additional metadata for returning in the action callback.

Args

status : string
The Action status to log.
fpath : string
A string containing the message data.
meta : dict
An optional dictionary containing additional metadata to add to the new message.

This convenience method allows for the creation of a new message using the inline data specified in data and any additional metadata supplied in meta.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_data("completed", "Hello, this is my inline data.", {"partner": "companyX"})
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
@staticmethod
def send_data(status: str, data: str, meta: dict = {}):
    """Static method for creating a dictionary with the provided status and a new message with the provided inline data and additional metadata for returning in the `action` callback.

    Args:
        status (string): The Action status to log.
        fpath (string): A string containing the message data.
        meta (dict): An optional dictionary containing additional metadata to add to the new message.

    This convenience method allows for the creation of a new message using the inline data specified in `data` and any additional metadata supplied in `meta`.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_data("completed", "Hello, this is my inline data.", {"partner": "companyX"})
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    msg = {**{"data": data}, **meta}
    return {"status": status, "msgs": [msg]}
def send_error(reason: str)

Static method for handling errors by creating a dictionary with a failed status and the provided reason for returning in the Action.action() callback.

Args

reason : string
The reason for the action failure.

This convenience method allows for the quick handling of error using the default failed status along with the reason specified in :paramreason.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_status("completed")
        else:
            return Action.send_error("Reason for Failure")
Expand source code
@staticmethod
def send_error(reason: str):
    """Static method for handling errors by creating a dictionary with a ``failed`` status and the provided reason for returning in the `Action.action` callback.

    Args:
        reason (string): The reason for the action failure.

    This convenience method allows for the quick handling of error using the default ``failed`` status along with the reason specified in :param`reason`.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_status("completed")
            else:
                return Action.send_error("Reason for Failure")
    ```
    """
    return {"status": "failed", "reason": reason}
def send_file(status: str, fpath: str, meta: dict = {})

Static method for creating a dictionary with the provided status and a new message from the provided fpath and additional metadata for returning in the Action.action() callback.

Args

status : string
The Action status to log.
fpath : string
A filepath to a file containing the message data.
meta : dict
An optional dictionary containing additional metadata to add to the new message.

This convenience method allows for the creation of a new message using the file specified in fpath and any additional metadata supplied in meta. File Size (fsize) and File Name (fname) are automatically retrieved from the given file.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_file("completed", "path/to/file", {"partner": "companyX"})
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
@staticmethod
def send_file(status: str, fpath: str, meta: dict = {}):
    """Static method for creating a dictionary with the provided status and a new message from the provided fpath and additional metadata for returning in the `Action.action` callback.

    Args:
        status (string): The Action status to log.
        fpath (string): A filepath to a file containing the message data.
        meta (dict): An optional dictionary containing additional metadata to add to the new message.

    This convenience method allows for the creation of a new message using the file specified in `fpath` and any additional metadata supplied in `meta`. File Size (fsize) and File Name (fname) are automatically retrieved from the given file.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_file("completed", "path/to/file", {"partner": "companyX"})
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    fname = os.path.basename(fpath)
    fsize = os.path.getsize(fpath)
    msg = {**{"fname": fname, "fsize": fsize, "fpath": fpath}, **meta}
    return {"status": status, "msgs": [msg]}
def send_status(status: str, reason: str = None)

Static method for creating a dictionary with the provided status and optional reason for returning in the action callback.

Args

status : string
The Action status to log.
reason : string
An optional reason to provide along with the given status.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_status("completed")
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
@staticmethod
def send_status(status: str, reason: str = None):
    """Static method for creating a dictionary with the provided status and optional reason for returning in the `action` callback.

    Args:
        status (string): The Action status to log.
        reason (string): An optional reason to provide along with the given status.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_status("completed")
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    if reason:
        return {"status": status, "reason": reason}
    else:
        return {"status": status}

Methods

def action(self)

Callback to override in order to perform action logic in a class that extends Action.

Relevant message data is available via the attributes available on the self object. It expects the method to return a python dictionary with at least a "status" key with the status of the action execution. If the action execution is unsuccessful, a "reason" key can also be returned along with an unsuccessful status. If a new message is intended to be created by this action, it is also expected that a "msg" key will be returned with a dictionary object containing either a "data" key with inline data or an "fpath" key containing the file path to the message. Similarly, actions used in Endpoints can return a "response" object with the response status code specified under the "code" key, and the response body provided via either the "data" or "fpath" key. The action method is automatically wrapped in a try-except block when it is called, so is unnecessary to wrap the overall method in a try-except block. If you wish to have more granular visibility over errors that arise in your actions, feel free to use try-except blocks at various steps throughout action execution to allow for logging specific reasons for failure. To simplify action creation and handling, a number of helper methods are exposed by the class as static methods.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_status("completed")
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
def action(self):
    """Callback to override in order to perform action logic in a class that extends `Action`.

    Relevant message data is available via the attributes available on the self object. It expects the method to return a python dictionary with at least a "status" key with the status of the action execution. If the action execution is unsuccessful, a "reason" key can also be returned along with an unsuccessful status. If a new message is intended to be created by this action, it is also expected that a "msg" key will be returned with a dictionary object containing either a "data" key with inline data or an "fpath" key containing the file path to the message. Similarly, actions used in Endpoints can return a "response" object with the response status code specified under the "code" key, and the response body provided via either the "data" or "fpath" key. The action method is automatically wrapped in a try-except block when it is called, so is unnecessary to wrap the overall method in a try-except block. If you wish to have more granular visibility over errors that arise in your actions, feel free to use try-except blocks at various steps throughout action execution to allow for logging specific reasons for failure. To simplify action creation and handling, a number of helper methods are exposed by the class as static methods.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_status("completed")
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    return {"status": "completed"}
def get_data(self)

Convenience method to read get string data from message.

Returns either the inline message data stored on the "data" key or the data stored in the message's file via the "fpath" key. Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        data = self.get_data()
        # Use data here
        if success:
            return Action.send_status("completed")
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
def get_data(self):
    """Convenience method to read get string data from message.

    Returns either the inline message data stored on the "data" key or the data stored in the message's file via the "fpath" key.
    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            data = self.get_data()
            # Use data here
            if success:
                return Action.send_status("completed")
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    if self.msg.get("data"):
        return self.msg["data"]
    else:
        return open(self.msg["fpath"]).read()
def send_files(status: str, files: list)

Static method for creating a dictionary with the provided status and new messages from the provided dicts with fpaths and additional metadata for returning in the Action.action() callback.

Args

status : string
The Action status to log.
files : list
A list of dicts containing an fpath (string) with a path to the file and an optional meta (dict) containing additional metadata.

This convenience method allows for the creation of new messages using the a list of files with specified fpaths and any additional metadata supplied in meta. File Size (fsize) and File Name (fname) are automatically retrieved from the given files.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        if success:
            return Action.send_file("completed", [{"fpath": path/to/file", "meta": {"partner": "companyX"}}])
        else:
            return Action.send_status("failed", "Reason for Failure")
Expand source code
def send_files(status: str, files: list):
    """Static method for creating a dictionary with the provided status and new messages from the provided dicts with fpaths and additional metadata for returning in the `Action.action` callback.

    Args:
        status (string): The Action status to log.
        files (list): A list of dicts containing an fpath (string) with a path to the file and an optional meta (dict) containing additional metadata.

    This convenience method allows for the creation of new messages using the a list of files with specified `fpath`s and any additional metadata supplied in `meta`. File Size (fsize) and File Name (fname) are automatically retrieved from the given files.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            if success:
                return Action.send_file("completed", [{"fpath": path/to/file", "meta": {"partner": "companyX"}}])
            else:
                return Action.send_status("failed", "Reason for Failure")
    ```
    """
    msgs = []
    for file in files:
        fname = os.path.basename(file["fpath"])
        fsize = os.path.getsize(file["fpath"])
        if file.get("meta"):
            meta = file["meta"]
        else:
            meta = {}
        msgs.append({**{"fname": fname, "fsize": fsize, "fpath": file["fpath"]}, **meta})
    return {"status": status, "msgs": msgs}
class DB (env)
Expand source code
class DB:
    def __init__(self, env):
        self.env = env

    def find(self, collection, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def find_one(self, collection, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find_one'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def create(self, collection, body):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        body = Map(body)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'create'), [collection, body])
        return Util.unravel_erlport_object(result)

    def update(self, collection, body, id):
        coll = bytes(collection, "utf-8")
        id = bytes(id, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        body = Map(body)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'update'), [collection, body, id])
        return Util.unravel_erlport_object(result)

    def delete(self, collection, id):
        coll = bytes(collection, "utf-8")
        id = bytes(id, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'delete'), [collection, id])
        return Util.unravel_erlport_object(result)

Methods

def create(self, collection, body)
Expand source code
def create(self, collection, body):
    coll = bytes(collection, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    body = Map(body)
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'create'), [collection, body])
    return Util.unravel_erlport_object(result)
def delete(self, collection, id)
Expand source code
def delete(self, collection, id):
    coll = bytes(collection, "utf-8")
    id = bytes(id, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'delete'), [collection, id])
    return Util.unravel_erlport_object(result)
def find(self, collection, clauses={}, opts={})
Expand source code
def find(self, collection, clauses={}, opts={}):
    coll = bytes(collection, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    clauses = Map(clauses)
    opts = Map(opts)
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'find'), [collection, clauses, opts])
    return Util.unravel_erlport_object(result)
def find_one(self, collection, clauses={}, opts={})
Expand source code
def find_one(self, collection, clauses={}, opts={}):
    coll = bytes(collection, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    clauses = Map(clauses)
    opts = Map(opts)
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'find_one'), [collection, clauses, opts])
    return Util.unravel_erlport_object(result)
def update(self, collection, body, id)
Expand source code
def update(self, collection, body, id):
    coll = bytes(collection, "utf-8")
    id = bytes(id, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    body = Map(body)
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'update'), [collection, body, id])
    return Util.unravel_erlport_object(result)
class Endpoint (msgdata)

The Endpoint class from AMPS extends the Action class to provide additional convenience in setting up API endpoint actions. Like regular actions, endpoint actions can be performed by overriding the Action.action() callback exposed by the class.

Attributes

msg : dict
The msg attribute contains a python of the dictionary and the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
parms : dict
The parms attribute contains all the parameters of the configured action including any extra parameters you may have specified under the "parms" key.
sysparms : dict
The sysparms attribute contains all useful system configuration parameters for use in actions. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
provider : dict
If use_provider is true on the configured action, the provider attribute contains the provider parameters. The provider parameters are available under the parms object as well under the "provider" key.
logger : Logger
The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the action execution will appear in the corresponding session logs.
path_params : dict
The path_params attribute contains any parameters that were passed to the endpoint via the URL. It is also available on the msg attribute via the "path_params" key.
query_params : dict
The query_params attribute contains any parameters that were passed to the endpoint via the query parameters. It is also available on the msg attribute via the "query_params" key.

Unlike actions, which are typically performed asynchronous via topic workflows, because Endpoints are called synchronously, if the endpoint action executes successfully, but produces an erroneous state, the action should return a successful status with the appropriate error code and response body under the "response" key.

Usage:

from amps import Endpoint
class my_endpoint(Endpoint):
    def action(self):
        # Perform Endpoint Logic Here
        if success:
            return Endpoint.send_resp_file("path/to/file", 200)
        else:
            return Endpoint.send_resp_data("File not found.", 404)
Expand source code
class Endpoint(Action):
    """The `Endpoint` class from AMPS extends the `Action` class to provide additional convenience in setting up API endpoint actions. Like regular actions, endpoint actions can be performed by overriding the `Action.action` callback exposed by the class.

    Attributes:
        msg (dict): The msg attribute contains a python of the dictionary and the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
        parms (dict): The parms attribute contains all the parameters of the configured action including any extra parameters you may have specified under the "parms" key.
        sysparms (dict): The sysparms attribute contains all useful system configuration parameters for use in actions. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
        provider (dict): If use_provider is true on the configured action, the provider attribute contains the provider parameters. The provider parameters are available under the parms  object as well under the "provider" key.
        logger (Logger): The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the action execution will appear in the corresponding session logs.
        path_params (dict): The path_params attribute contains any parameters that were passed to the endpoint via the URL. It is also available on the `msg` attribute via the "path_params" key.
        query_params (dict): The query_params attribute contains any parameters that were passed to the endpoint via the query parameters. It is also available on the `msg` attribute via the "query_params" key.

    Unlike actions, which are typically performed asynchronous via topic workflows, because Endpoints are called synchronously, if the endpoint action executes successfully, but produces an erroneous state, the action should return a successful status with the appropriate error code and response body under the "response" key.

    Usage:
    ```
    from amps import Endpoint
    class my_endpoint(Endpoint):
        def action(self):
            # Perform Endpoint Logic Here
            if success:
                return Endpoint.send_resp_file("path/to/file", 200)
            else:
                return Endpoint.send_resp_data("File not found.", 404)
    ```
    """

    def __init__(self, msgdata):
        super().__init__(msgdata)
        self.path_params = self.msg.get("path_params")
        self.query_params = self.msg.get("query_params")

    def send_resp_data(data: str, code: int):
        """Static method for creating a dictionary with the provided inline data and status code in the "response" object for returning in the `Action.action` callback.

        Args:
            data (string): The inline data to supply in the response body.
            code (integer): An integer containing the response status code to set.

        Usage:
        ```
        from amps import Endpoint
        import json
        class my_endpoint(Endpoint):
            def action(self):
                # Perform Endpoint Logic Here
                if success:
                    return Endpoint.send_resp_data(json.dumps({"id": 1, "name": "Test User"}), 200)
                else:
                    return Endpoint.send_resp_data("User not found.", 404)
        ```
        """
        return {"status": "completed", "response": {"data": data, "code": code}}

    def send_resp_file(fpath, code):
        """Static method for creating a dictionary with the provided file path and status code in the "response" object for returning in the `Action.action` callback.

        Args:
            fpath (string): The path to file to send in the response body.
            code (integer): An integer containing the response status code to set.

        This convenience method allows for the convenient sending of a file in the response of a request along with the specified status code.

        Usage:
        ```
        from amps import Endpoint
        class my_endpoint(Endpoint):
            def action(self):
                # Perform Endpoint Logic Here
                if success:
                    return Endpoint.send_resp_file("path/to/file", 200)
                else:
                    return Endpoint.send_resp_data("File not found.", 404)
        ```
        """
        return {"status": "completed", "response": {"fpath": fpath, "code": code}}

Ancestors

Methods

def send_resp_data(data: str, code: int)

Static method for creating a dictionary with the provided inline data and status code in the "response" object for returning in the Action.action() callback.

Args

data : string
The inline data to supply in the response body.
code : integer
An integer containing the response status code to set.

Usage:

from amps import Endpoint
import json
class my_endpoint(Endpoint):
    def action(self):
        # Perform Endpoint Logic Here
        if success:
            return Endpoint.send_resp_data(json.dumps({"id": 1, "name": "Test User"}), 200)
        else:
            return Endpoint.send_resp_data("User not found.", 404)
Expand source code
def send_resp_data(data: str, code: int):
    """Static method for creating a dictionary with the provided inline data and status code in the "response" object for returning in the `Action.action` callback.

    Args:
        data (string): The inline data to supply in the response body.
        code (integer): An integer containing the response status code to set.

    Usage:
    ```
    from amps import Endpoint
    import json
    class my_endpoint(Endpoint):
        def action(self):
            # Perform Endpoint Logic Here
            if success:
                return Endpoint.send_resp_data(json.dumps({"id": 1, "name": "Test User"}), 200)
            else:
                return Endpoint.send_resp_data("User not found.", 404)
    ```
    """
    return {"status": "completed", "response": {"data": data, "code": code}}
def send_resp_file(fpath, code)

Static method for creating a dictionary with the provided file path and status code in the "response" object for returning in the Action.action() callback.

Args

fpath : string
The path to file to send in the response body.
code : integer
An integer containing the response status code to set.

This convenience method allows for the convenient sending of a file in the response of a request along with the specified status code.

Usage:

from amps import Endpoint
class my_endpoint(Endpoint):
    def action(self):
        # Perform Endpoint Logic Here
        if success:
            return Endpoint.send_resp_file("path/to/file", 200)
        else:
            return Endpoint.send_resp_data("File not found.", 404)
Expand source code
def send_resp_file(fpath, code):
    """Static method for creating a dictionary with the provided file path and status code in the "response" object for returning in the `Action.action` callback.

    Args:
        fpath (string): The path to file to send in the response body.
        code (integer): An integer containing the response status code to set.

    This convenience method allows for the convenient sending of a file in the response of a request along with the specified status code.

    Usage:
    ```
    from amps import Endpoint
    class my_endpoint(Endpoint):
        def action(self):
            # Perform Endpoint Logic Here
            if success:
                return Endpoint.send_resp_file("path/to/file", 200)
            else:
                return Endpoint.send_resp_data("File not found.", 404)
    ```
    """
    return {"status": "completed", "response": {"fpath": fpath, "code": code}}

Inherited members

class Logger (sid='', service=None)

The Logger class from AMPS is a utility class for logging events back to AMPS. This class should not be used directly, but rather instances of this class within Actions, Endpoints, and Services should be used.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        self.logger.info("Action Execution Started")
        # Perform Action Logic Here
Expand source code
class Logger:
    """The `Logger` class from AMPS is a utility class for logging events back to AMPS. This class should not be used directly, but rather instances of this class within Actions, Endpoints, and Services should be used.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            self.logger.info("Action Execution Started")
            # Perform Action Logic Here
    ```
    """

    def __init__(self, sid="", service=None):
        self.__sid__ = sid
        self.__service__ = service

    def log(self, level: str, message: str):
        """Instance method that logs a given message with the given level.

        Args:
            level (string): The level to use when logging the given message. All valid "level" values are available in the docs for the [Logger](https://hexdocs.pm/logger/1.13/Logger.html#module-levels) Elixir library underlying this class.
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                self.logger.log("info", "Action Execution Started")
                # Perform Action Logic Here
        ```
        """
        if self.__service__:
            self.__service__.__log__(Atom(
                bytes(level, "utf-8")), message)
        else:
            call(Atom(b'Elixir.Amps.PyHandler'), Atom(b'log'), [Atom(
                bytes(level, "utf-8")), message, [(Atom(b'sid'), self.__sid__)]])

    def info(self, message: str):
        """Instance method that logs a given message with the "info" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                self.logger.info("Action Execution Started")
                # Perform Action Logic Here
        ```
        """
        self.log("info", message)

    def debug(self, message: str):
        """Instance method that logs a given message with the "debug" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                self.logger.debug("Received message")
        ```
        """
        self.log("debug", message)

    def warning(self, message: str):
        """Instance method that logs a given message with the "warning" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                self.logger.warning("Something minor went wrong.")
        ```
        """
        self.log("warning", message)

    def error(self, message: str):
        """Instance method that logs a given message with the "error" level.

        Args:
            message (string): The message to log.

        Usage:
        ```
        from amps import Action
        class my_action(Action):
            def action(self):
                # Perform Action Logic Here
                self.logger.error("Something major went wrong.")
        ```
        """
        self.log("error", message)

Methods

def debug(self, message: str)

Instance method that logs a given message with the "debug" level.

Args

message : string
The message to log.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        self.logger.debug("Received message")
Expand source code
def debug(self, message: str):
    """Instance method that logs a given message with the "debug" level.

    Args:
        message (string): The message to log.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            self.logger.debug("Received message")
    ```
    """
    self.log("debug", message)
def error(self, message: str)

Instance method that logs a given message with the "error" level.

Args

message : string
The message to log.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        self.logger.error("Something major went wrong.")
Expand source code
def error(self, message: str):
    """Instance method that logs a given message with the "error" level.

    Args:
        message (string): The message to log.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            self.logger.error("Something major went wrong.")
    ```
    """
    self.log("error", message)
def info(self, message: str)

Instance method that logs a given message with the "info" level.

Args

message : string
The message to log.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        self.logger.info("Action Execution Started")
        # Perform Action Logic Here
Expand source code
def info(self, message: str):
    """Instance method that logs a given message with the "info" level.

    Args:
        message (string): The message to log.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            self.logger.info("Action Execution Started")
            # Perform Action Logic Here
    ```
    """
    self.log("info", message)
def log(self, level: str, message: str)

Instance method that logs a given message with the given level.

Args

level : string
The level to use when logging the given message. All valid "level" values are available in the docs for the Logger Elixir library underlying this class.
message : string
The message to log.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        self.logger.log("info", "Action Execution Started")
        # Perform Action Logic Here
Expand source code
def log(self, level: str, message: str):
    """Instance method that logs a given message with the given level.

    Args:
        level (string): The level to use when logging the given message. All valid "level" values are available in the docs for the [Logger](https://hexdocs.pm/logger/1.13/Logger.html#module-levels) Elixir library underlying this class.
        message (string): The message to log.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            self.logger.log("info", "Action Execution Started")
            # Perform Action Logic Here
    ```
    """
    if self.__service__:
        self.__service__.__log__(Atom(
            bytes(level, "utf-8")), message)
    else:
        call(Atom(b'Elixir.Amps.PyHandler'), Atom(b'log'), [Atom(
            bytes(level, "utf-8")), message, [(Atom(b'sid'), self.__sid__)]])
def warning(self, message: str)

Instance method that logs a given message with the "warning" level.

Args

message : string
The message to log.

Usage:

from amps import Action
class my_action(Action):
    def action(self):
        # Perform Action Logic Here
        self.logger.warning("Something minor went wrong.")
Expand source code
def warning(self, message: str):
    """Instance method that logs a given message with the "warning" level.

    Args:
        message (string): The message to log.

    Usage:
    ```
    from amps import Action
    class my_action(Action):
        def action(self):
            # Perform Action Logic Here
            self.logger.warning("Something minor went wrong.")
    ```
    """
    self.log("warning", message)
class Service (parms, sysparms, pid, env, lhandler)

The Service class from AMPS provides a base class for custom python services that can be managed by AMPS, and act as both consumers of messages and producers of new messages.

Attributes

parms : dict
The parms attribute contains all the parameters of the configured service.
sysparms : dict
The sysparms attribute contains all useful system configuration parameters for use in services. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
config : dict
The config attribute contains all the custom configuration provided when creating the service. All config is also available in the parms attribute under the "config" key.
logger : Logger
The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the message handling or creation will appear in the corresponding session logs.
env : string
The name of the AMPS environment in which this service is running.

The Service class provides even more flexibility than the Action class in extending the functionality of AMPS. Unlike actions which are run and stopped, services can manage processes such as consumers/subscribers, web servers and sockets, etc, allowing the service to consume messages from and produce messages to AMPS.

The class exposes two callback methods, Service.initialize() and Service.handle_message(). - initialize can be used to perform any initialization actions and start any subprocesses. Note that any subprocesses should be started in a separate thread in order to not block the main thread. - handle_message can be used to receive messages from the topic specified in the service configuration.

The class additionally contains two methods for creating messages, Service.send_message() and Service.send_new(). Service.send_message() should generally be used in handle_message in order to indicate that the message being created is stemming from the received message. Conversely, Service.send_new() should be used to create new messages originating from an external source, such as a web server or consumer.

Usage:

from amps import Service
class my_service(Service):
    def initialize(self):
        # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
        # Perform any other startup logic.

    def handle_message(self, msg, logger):
        # Maybe deliver this message to my subprocess or use it to process/transform the message.
        # Send a new message stemming from this message.
        self.send_message(
            msg, {"data": "New Message Data Here", "my_custom": "metadata"})


Expand source code
class Service:
    """The `Service` class from AMPS provides a base class for custom python services that can be managed by AMPS, and act as both consumers of messages and producers of new messages.

    Attributes:
        parms (dict): The parms attribute contains all the parameters of the configured service.
        sysparms (dict): The sysparms attribute contains all useful system configuration parameters for use in services. Currently, sysparms only contains the AMPS temporary directory under the "tempdir" key.
        config (dict): The config attribute contains all the custom configuration provided when creating the service. All config is also available in the parms attribute under the "config" key.
        logger (Logger): The logger attribute exposes a Logger object for logging events back to AMPS. Any messages logged using this object during the message handling or creation will appear in the corresponding session logs.
        env (string): The name of the AMPS environment in which this service is running.

    The `Service` class provides even more flexibility than the `Action` class in extending the functionality of AMPS. Unlike actions which are run and stopped, services can manage processes such as consumers/subscribers, web servers and sockets, etc, allowing the service to consume messages from and produce messages to AMPS.

    The class exposes two callback methods, `Service.initialize` and `Service.handle_message`.
    - `initialize` can be used to perform any initialization actions and start any subprocesses. Note that any subprocesses should be started in a separate thread in order to not block the main thread.
    - `handle_message` can be used to receive messages from the topic specified in the service configuration.

    The class additionally contains two methods for creating messages, `Service.send_message` and `Service.send_new`.
    `Service.send_message` should generally be used in handle_message in order to indicate that the message being created is stemming from the received message.
    Conversely, `Service.send_new` should be used to create new messages originating from an external source, such as a web server or consumer.

    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def initialize(self):
            # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
            # Perform any other startup logic.

        def handle_message(self, msg, logger):
            # Maybe deliver this message to my subprocess or use it to process/transform the message.
            # Send a new message stemming from this message.
            self.send_message(
                msg, {"data": "New Message Data Here", "my_custom": "metadata"})


    ```
    """

    def __init__(self, parms, sysparms, pid, env, lhandler):
        self.parms = json.loads(parms)
        self.sysparms = json.loads(sysparms)
        self.config = self.parms["config"]
        self.env = env
        self.logger = Logger(service=self)
        self.__pid__ = pid
        self.__lhandler__ = lhandler
        self.initialize()

    def __receive__(self, data):
        try:
            msg = json.loads(data)
            logger = Logger(sid=msg["sid"])

            logger.info(
                f'Message received by Custom Service {self.parms["name"]}')
            resp = self.handle_message(msg, logger)
            return (Atom(b'ok'), resp)
        except Exception as e:
            return (Atom(b'error'), str(e))

    def __response__(self, resp, id):
        global responses
        responses[id.encode("ascii")] = resp
        # f = open(id, "w").write(resp)

    def __await_response__(self, id):
        global responses
        resp = responses.get(id)
        while not resp:
            resp = responses.get(id)
        return responses.pop(id)

    def __send__(self, msg):
        cast(self.__pid__, msg)

    def __send_and_receive__(self, msg):
        id = Util.get_id().encode("ascii")
        global responses
        cast(self.__pid__, (msg, id))
        resp = self.__await_response__(id)
        return json.loads(resp)

    def __log__(self, level, msg):
        cast(self.__lhandler__, (Atom(b'log'), (level, msg)))

    def initialize(self):
        """Instance method for performing any initialization logic and starting any subprocesses.  
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.
        ```
        """
        pass

    def send_message(self, msg: dict, newmsg: dict):
        """Instance method for sending messages transformed by the `handle_message` callback. 
        The method accepts the original message which should be provided as is, as well as the new message with any new metadata to merge with the original message. If a "data" or "fpath" value is provided, it will overwrite the inline "data" or "fpath" on the current message. New message IDs are automatically generated and added by and retuned from this method for convenience.



        Args:
            msg (dict): The original message currently being processed in `handle_message`.
            newmsg (dict): A dictionary containing any new metadata to overwrite in the outgoing message.
        Returns:
            msgid: Message ID of the newly sent message.


        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.

            def handle_message(self, msg, logger):
                # Maybe deliver this message to my subprocess or use it to process/transform the message.
                # Send a new message stemming from this message.
                self.send_message(msg, {"data": "New Message Data Here", "my_custom": "metadata"})
        ```
        """
        msgid = Util.get_id()
        newmsg['parent'] = msg['msgid']
        newmsg['msgid'] = msgid
        if "data" in newmsg:
            del msg["fpath"]
            newmsg["fsize"] = len(newmsg["data"])
        elif "fpath" in newmsg:
            del msg["data"]
            newmsg["fsize"] = os.path.getsize(newmsg["fpath"])
        call(Atom(b'Elixir.Amps.PyHandler'), Atom(b'send_message'),
             [json.dumps({**msg, **newmsg}), json.dumps(self.parms), self.env])
        return msgid

    def send_new(self, newmsg: dict, action: str, response=False, timeout=15000):
        """Instance method for sending new messages generated from an external source. 

        Args:
            newmsg (dict): A dictionary containing the new outgoing message to send. 
            action (str): The output map action to use when sending this message.
            response (boolean): Whether to wait for and receive all the outputs or results of the processing of new message.
            timeout (int): When response is True, this indicates how long to wait for a response from message processing. 

        Returns:
            response: If response is True, will return result of message processing, otherwise will return message ID of the newly sent message.

        The method accepts a new message with any additional metadata. If a "data" of "fpath" is provided, associated metadata is also generated. In order to get a result from the topic-based message processing of the new message, the response parameter can be passed as True.

        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.

            def my_custom_function(self):
                # Any additional method in which are you generating a new message using an external source.
                self.send_new({"data": "New Message Data Here", "my_custom": "metadata"}, "myaction_name")
        ```
        """
        msgid = Util.get_id()
        newmsg['msgid'] = msgid
        if "data" in newmsg:
            newmsg["fsize"] = len(newmsg["data"])
        else:
            newmsg["fsize"] = os.path.getsize(newmsg["fpath"])
        resp = self.__send_and_receive__((Atom(
            b'new'), json.dumps(newmsg), action, response, timeout))
        return resp

    def create_session(self, user={}):
        """Instance method for authenticating and creating a session for an AMPS user.  

        Args:
            user (dict): A dictionary with a username and password key to use for authentication.

        Returns:
            success: A boolean indicating whether the operation was succesful.
            response: Either an error message when the operation fails or a dictionary with the following keys:
                access_token (str): The access token to use to authenticate the user.
                renewal_token (str): The renewal token when renewing the user's session. 
                user (dict): A dictionary containing all the user details. 
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def my_custom_login(self, user):
                response = self.create_session(user)
                if response["success"]:
                    # Handle Success
                else:
                    # Handle Failure

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'authenticate'), json.dumps(user)))
        return resp

    def renew_session(self, renewal_token: str):
        """Instance method for renewing an AMPS user's session.

        Args:
            renewal_token (str): A string containing the renewal token received when creating or renewing the session.

        Returns:
            success: A boolean indicating whether the operation was succesful.
            response: Either an error message when the operation fails or a dictionary with the following keys:
                access_token (str): The access token to use to authenticate the user.
                renewal_token (str): The renewal token when renewing the user's session. 
                user (dict): A dictionary containing all the user details. 
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def my_renewal(self, renewal_token):
                response = self.renew_session(renewal_token)
                if response["success"]:
                    # Handle Success
                else:
                    # Handle Failure

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'renew'), renewal_token))
        return resp

    def verify(self, access_token: str):
        """Instance method for verifying an AMPS user's session. 

        Args:
            access_token (str): A string containing the current access token for the user's session

        Returns:
            user: Either a dictionary with the user data on successful verification or False for failed verification. 
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def post(self, *args):
                access_token = self.request.headers.get('Authorization')
                verified = self.verify(access_token)
                if verified:
                    # Perform Logic Here
                else:
                    # Send Error
                    self.send_error(401)

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'fetch'), access_token))
        return resp

    def delete_session(self, access_token: str):
        """Instance method for deleting an AMPS user's session. 

        Args:
            access_token (str): A string containing the current access token for the user's session

        Returns:
            success: A boolean indicating whether the operation was succesful.

        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def delete(self, *args):
                access_token = self.request.headers.get('Authorization')
                resp = self.delete_session(access_token)

        ```
        """
        resp = self.__send_and_receive__((Atom(
            b'delete'), access_token))
        return resp

    def handle_message(self, msg: dict, logger: Logger):
        """Callback method for receiving messages on the configured topic. 

        Args:
            msg (dict): The msg args contains a python dictionary of the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data. 
            logger (Logger): A logger corresponding to the currently received message. Logging event using this object will accordingly render them in the corresponding message event sessions.
        Usage:
        ```
        from amps import Service
        class my_service(Service):
            def initialize(self):
                # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
                # Perform any other startup logic.
        ```
        """
        return "completed"

Methods

def create_session(self, user={})

Instance method for authenticating and creating a session for an AMPS user.

Args

user : dict
A dictionary with a username and password key to use for authentication.

Returns

success
A boolean indicating whether the operation was succesful.
response
Either an error message when the operation fails or a dictionary with the following keys: access_token (str): The access token to use to authenticate the user. renewal_token (str): The renewal token when renewing the user's session. user (dict): A dictionary containing all the user details.

Usage:

from amps import Service
class my_service(Service):
    def my_custom_login(self, user):
        response = self.create_session(user)
        if response["success"]:
            # Handle Success
        else:
            # Handle Failure

Expand source code
def create_session(self, user={}):
    """Instance method for authenticating and creating a session for an AMPS user.  

    Args:
        user (dict): A dictionary with a username and password key to use for authentication.

    Returns:
        success: A boolean indicating whether the operation was succesful.
        response: Either an error message when the operation fails or a dictionary with the following keys:
            access_token (str): The access token to use to authenticate the user.
            renewal_token (str): The renewal token when renewing the user's session. 
            user (dict): A dictionary containing all the user details. 
    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def my_custom_login(self, user):
            response = self.create_session(user)
            if response["success"]:
                # Handle Success
            else:
                # Handle Failure

    ```
    """
    resp = self.__send_and_receive__((Atom(
        b'authenticate'), json.dumps(user)))
    return resp
def delete_session(self, access_token: str)

Instance method for deleting an AMPS user's session.

Args

access_token : str
A string containing the current access token for the user's session

Returns

success
A boolean indicating whether the operation was succesful.

Usage:

from amps import Service
class my_service(Service):
    def delete(self, *args):
        access_token = self.request.headers.get('Authorization')
        resp = self.delete_session(access_token)

Expand source code
def delete_session(self, access_token: str):
    """Instance method for deleting an AMPS user's session. 

    Args:
        access_token (str): A string containing the current access token for the user's session

    Returns:
        success: A boolean indicating whether the operation was succesful.

    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def delete(self, *args):
            access_token = self.request.headers.get('Authorization')
            resp = self.delete_session(access_token)

    ```
    """
    resp = self.__send_and_receive__((Atom(
        b'delete'), access_token))
    return resp
def handle_message(self, msg: dict, logger: Logger)

Callback method for receiving messages on the configured topic.

Args

msg : dict
The msg args contains a python dictionary of the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data.
logger : Logger
A logger corresponding to the currently received message. Logging event using this object will accordingly render them in the corresponding message event sessions.

Usage:

from amps import Service
class my_service(Service):
    def initialize(self):
        # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
        # Perform any other startup logic.
Expand source code
def handle_message(self, msg: dict, logger: Logger):
    """Callback method for receiving messages on the configured topic. 

    Args:
        msg (dict): The msg args contains a python dictionary of the message with all of its metadata. Message data can be accessed from the msg attribute, either using the "data" key for inline data, or the "fpath" key for a path to the file containing the data. 
        logger (Logger): A logger corresponding to the currently received message. Logging event using this object will accordingly render them in the corresponding message event sessions.
    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def initialize(self):
            # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
            # Perform any other startup logic.
    ```
    """
    return "completed"
def initialize(self)

Instance method for performing any initialization logic and starting any subprocesses.
Usage:

from amps import Service
class my_service(Service):
    def initialize(self):
        # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
        # Perform any other startup logic.
Expand source code
def initialize(self):
    """Instance method for performing any initialization logic and starting any subprocesses.  
    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def initialize(self):
            # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
            # Perform any other startup logic.
    ```
    """
    pass
def renew_session(self, renewal_token: str)

Instance method for renewing an AMPS user's session.

Args

renewal_token : str
A string containing the renewal token received when creating or renewing the session.

Returns

success
A boolean indicating whether the operation was succesful.
response
Either an error message when the operation fails or a dictionary with the following keys: access_token (str): The access token to use to authenticate the user. renewal_token (str): The renewal token when renewing the user's session. user (dict): A dictionary containing all the user details.

Usage:

from amps import Service
class my_service(Service):
    def my_renewal(self, renewal_token):
        response = self.renew_session(renewal_token)
        if response["success"]:
            # Handle Success
        else:
            # Handle Failure

Expand source code
def renew_session(self, renewal_token: str):
    """Instance method for renewing an AMPS user's session.

    Args:
        renewal_token (str): A string containing the renewal token received when creating or renewing the session.

    Returns:
        success: A boolean indicating whether the operation was succesful.
        response: Either an error message when the operation fails or a dictionary with the following keys:
            access_token (str): The access token to use to authenticate the user.
            renewal_token (str): The renewal token when renewing the user's session. 
            user (dict): A dictionary containing all the user details. 
    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def my_renewal(self, renewal_token):
            response = self.renew_session(renewal_token)
            if response["success"]:
                # Handle Success
            else:
                # Handle Failure

    ```
    """
    resp = self.__send_and_receive__((Atom(
        b'renew'), renewal_token))
    return resp
def send_message(self, msg: dict, newmsg: dict)

Instance method for sending messages transformed by the handle_message callback. The method accepts the original message which should be provided as is, as well as the new message with any new metadata to merge with the original message. If a "data" or "fpath" value is provided, it will overwrite the inline "data" or "fpath" on the current message. New message IDs are automatically generated and added by and retuned from this method for convenience.

Args

msg : dict
The original message currently being processed in handle_message.
newmsg : dict
A dictionary containing any new metadata to overwrite in the outgoing message.

Returns

msgid
Message ID of the newly sent message.

Usage:

from amps import Service
class my_service(Service):
    def initialize(self):
        # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
        # Perform any other startup logic.

    def handle_message(self, msg, logger):
        # Maybe deliver this message to my subprocess or use it to process/transform the message.
        # Send a new message stemming from this message.
        self.send_message(msg, {"data": "New Message Data Here", "my_custom": "metadata"})
Expand source code
def send_message(self, msg: dict, newmsg: dict):
    """Instance method for sending messages transformed by the `handle_message` callback. 
    The method accepts the original message which should be provided as is, as well as the new message with any new metadata to merge with the original message. If a "data" or "fpath" value is provided, it will overwrite the inline "data" or "fpath" on the current message. New message IDs are automatically generated and added by and retuned from this method for convenience.



    Args:
        msg (dict): The original message currently being processed in `handle_message`.
        newmsg (dict): A dictionary containing any new metadata to overwrite in the outgoing message.
    Returns:
        msgid: Message ID of the newly sent message.


    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def initialize(self):
            # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
            # Perform any other startup logic.

        def handle_message(self, msg, logger):
            # Maybe deliver this message to my subprocess or use it to process/transform the message.
            # Send a new message stemming from this message.
            self.send_message(msg, {"data": "New Message Data Here", "my_custom": "metadata"})
    ```
    """
    msgid = Util.get_id()
    newmsg['parent'] = msg['msgid']
    newmsg['msgid'] = msgid
    if "data" in newmsg:
        del msg["fpath"]
        newmsg["fsize"] = len(newmsg["data"])
    elif "fpath" in newmsg:
        del msg["data"]
        newmsg["fsize"] = os.path.getsize(newmsg["fpath"])
    call(Atom(b'Elixir.Amps.PyHandler'), Atom(b'send_message'),
         [json.dumps({**msg, **newmsg}), json.dumps(self.parms), self.env])
    return msgid
def send_new(self, newmsg: dict, action: str, response=False, timeout=15000)

Instance method for sending new messages generated from an external source.

Args

newmsg : dict
A dictionary containing the new outgoing message to send.
action : str
The output map action to use when sending this message.
response : boolean
Whether to wait for and receive all the outputs or results of the processing of new message.
timeout : int
When response is True, this indicates how long to wait for a response from message processing.

Returns

response
If response is True, will return result of message processing, otherwise will return message ID of the newly sent message.

The method accepts a new message with any additional metadata. If a "data" of "fpath" is provided, associated metadata is also generated. In order to get a result from the topic-based message processing of the new message, the response parameter can be passed as True.

Usage:

from amps import Service
class my_service(Service):
    def initialize(self):
        # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
        # Perform any other startup logic.

    def my_custom_function(self):
        # Any additional method in which are you generating a new message using an external source.
        self.send_new({"data": "New Message Data Here", "my_custom": "metadata"}, "myaction_name")
Expand source code
def send_new(self, newmsg: dict, action: str, response=False, timeout=15000):
    """Instance method for sending new messages generated from an external source. 

    Args:
        newmsg (dict): A dictionary containing the new outgoing message to send. 
        action (str): The output map action to use when sending this message.
        response (boolean): Whether to wait for and receive all the outputs or results of the processing of new message.
        timeout (int): When response is True, this indicates how long to wait for a response from message processing. 

    Returns:
        response: If response is True, will return result of message processing, otherwise will return message ID of the newly sent message.

    The method accepts a new message with any additional metadata. If a "data" of "fpath" is provided, associated metadata is also generated. In order to get a result from the topic-based message processing of the new message, the response parameter can be passed as True.

    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def initialize(self):
            # Start my subprocess here, potentially passing service reference to subprocess so that it can leverage `send_new`.
            # Perform any other startup logic.

        def my_custom_function(self):
            # Any additional method in which are you generating a new message using an external source.
            self.send_new({"data": "New Message Data Here", "my_custom": "metadata"}, "myaction_name")
    ```
    """
    msgid = Util.get_id()
    newmsg['msgid'] = msgid
    if "data" in newmsg:
        newmsg["fsize"] = len(newmsg["data"])
    else:
        newmsg["fsize"] = os.path.getsize(newmsg["fpath"])
    resp = self.__send_and_receive__((Atom(
        b'new'), json.dumps(newmsg), action, response, timeout))
    return resp
def verify(self, access_token: str)

Instance method for verifying an AMPS user's session.

Args

access_token : str
A string containing the current access token for the user's session

Returns

user
Either a dictionary with the user data on successful verification or False for failed verification.

Usage:

from amps import Service
class my_service(Service):
    def post(self, *args):
        access_token = self.request.headers.get('Authorization')
        verified = self.verify(access_token)
        if verified:
            # Perform Logic Here
        else:
            # Send Error
            self.send_error(401)

Expand source code
def verify(self, access_token: str):
    """Instance method for verifying an AMPS user's session. 

    Args:
        access_token (str): A string containing the current access token for the user's session

    Returns:
        user: Either a dictionary with the user data on successful verification or False for failed verification. 
    Usage:
    ```
    from amps import Service
    class my_service(Service):
        def post(self, *args):
            access_token = self.request.headers.get('Authorization')
            verified = self.verify(access_token)
            if verified:
                # Perform Logic Here
            else:
                # Send Error
                self.send_error(401)

    ```
    """
    resp = self.__send_and_receive__((Atom(
        b'fetch'), access_token))
    return resp
class Users (env)
Expand source code
class Users:
    def __init__(self, env):
        self.env = env

    def find(self, env, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def find_one(self, env, clauses={}, opts={}):
        coll = bytes(collection, "utf-8")
        collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
            b'index'), [bytes(self.env, "utf-8"), coll])
        clauses = Map(clauses)
        opts = Map(opts)
        result = call(Atom(b'Elixir.Amps.PyService'),
                      Atom(b'find_one'), [collection, clauses, opts])
        return Util.unravel_erlport_object(result)

    def create(self, user):
        user = Map(user)
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'create'), [user, self.env])
        return Util.unravel_erlport_object(result)

    def update(self, id, body):
        user = Map(body)
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'update'), [id, user, self.env])
        return Util.unravel_erlport_object(result)

    def delete(self, id):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'delete'), [id, self.env])
        return Util.unravel_erlport_object(result)

    def create_session(self, user):
        user = Map(user)
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'create_session'), [user, self.env])
        return Util.unravel_erlport_object(result)

    def authenticate(self, access_token):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'authenticate'), [access_token, self.env])
        return Util.unravel_erlport_object(result)

    def renew_session(self, renewal_token):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'renew_session'), [renewal_token, self.env])
        return Util.unravel_erlport_object(result)

    def delete_session(self, access_token):
        result = call(Atom(b'Elixir.Amps.PyService.Users'),
                      Atom(b'delete_session'), [access_token, self.env])
        return Util.unravel_erlport_object(result)

Methods

def authenticate(self, access_token)
Expand source code
def authenticate(self, access_token):
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'authenticate'), [access_token, self.env])
    return Util.unravel_erlport_object(result)
def create(self, user)
Expand source code
def create(self, user):
    user = Map(user)
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'create'), [user, self.env])
    return Util.unravel_erlport_object(result)
def create_session(self, user)
Expand source code
def create_session(self, user):
    user = Map(user)
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'create_session'), [user, self.env])
    return Util.unravel_erlport_object(result)
def delete(self, id)
Expand source code
def delete(self, id):
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'delete'), [id, self.env])
    return Util.unravel_erlport_object(result)
def delete_session(self, access_token)
Expand source code
def delete_session(self, access_token):
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'delete_session'), [access_token, self.env])
    return Util.unravel_erlport_object(result)
def find(self, env, clauses={}, opts={})
Expand source code
def find(self, env, clauses={}, opts={}):
    coll = bytes(collection, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    clauses = Map(clauses)
    opts = Map(opts)
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'find'), [collection, clauses, opts])
    return Util.unravel_erlport_object(result)
def find_one(self, env, clauses={}, opts={})
Expand source code
def find_one(self, env, clauses={}, opts={}):
    coll = bytes(collection, "utf-8")
    collection = call(Atom(b'Elixir.AmpsUtil'), Atom(
        b'index'), [bytes(self.env, "utf-8"), coll])
    clauses = Map(clauses)
    opts = Map(opts)
    result = call(Atom(b'Elixir.Amps.PyService'),
                  Atom(b'find_one'), [collection, clauses, opts])
    return Util.unravel_erlport_object(result)
def renew_session(self, renewal_token)
Expand source code
def renew_session(self, renewal_token):
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'renew_session'), [renewal_token, self.env])
    return Util.unravel_erlport_object(result)
def update(self, id, body)
Expand source code
def update(self, id, body):
    user = Map(body)
    result = call(Atom(b'Elixir.Amps.PyService.Users'),
                  Atom(b'update'), [id, user, self.env])
    return Util.unravel_erlport_object(result)
class Util

The Util class provides utility methods that may be useful during action and service execution.

Expand source code
class Util:
    """The `Util` class provides utility methods that may be useful during action and service execution.


    """
    def get_id():
        """Utility method that returns a unique ID in the format used by AMPS.
        """
        return uuid.UUID(str(uuid.uuid4())).hex

    @staticmethod
    def unravel_erlport_object(result):
        if isinstance(result, List):
            return [Util.unravel_erlport_object(x) for x in result]
        elif isinstance(result, Map):
            return {k.decode(): Util.unravel_erlport_object(v) for k, v in result.items()}
        elif isinstance(result, Atom):
            return result.decode()
        elif isinstance(result, bytes):
            return result.decode()
        else:
            return result

Static methods

def unravel_erlport_object(result)
Expand source code
@staticmethod
def unravel_erlport_object(result):
    if isinstance(result, List):
        return [Util.unravel_erlport_object(x) for x in result]
    elif isinstance(result, Map):
        return {k.decode(): Util.unravel_erlport_object(v) for k, v in result.items()}
    elif isinstance(result, Atom):
        return result.decode()
    elif isinstance(result, bytes):
        return result.decode()
    else:
        return result

Methods

def get_id()

Utility method that returns a unique ID in the format used by AMPS.

Expand source code
def get_id():
    """Utility method that returns a unique ID in the format used by AMPS.
    """
    return uuid.UUID(str(uuid.uuid4())).hex