Source code for livy.client

import logging
from typing import Any, Union, Dict, List, Tuple, Optional

import requests

from livy.models import Version, Session, SessionKind, Statement, StatementKind


Auth = Union[requests.auth.AuthBase, Tuple[str, str]]


LOGGER = logging.getLogger(__name__)


VALID_LEGACY_SESSION_KINDS = {
    SessionKind.SPARK,
    SessionKind.PYSPARK,
    SessionKind.PYSPARK3,
    SessionKind.SPARKR,
}
VALID_SESSION_KINDS = {
    SessionKind.SPARK,
    SessionKind.PYSPARK,
    SessionKind.SPARKR,
    SessionKind.SQL,
    SessionKind.SHARED,
}


class JsonClient:
    """A wrapper for a requests session for JSON formatted requests.

    This client handles appending endpoints on to a common hostname,
    deserialising the response as JSON and raising an exception when an error
    HTTP code is received.
    """

    def __init__(self, url: str, auth: Auth = None) -> None:
        self.url = url
        self.session = requests.Session()
        if auth is not None:
            self.session.auth = auth

    def close(self) -> None:
        self.session.close()

    def get(self, endpoint: str = "") -> dict:
        return self._request("GET", endpoint)

    def post(self, endpoint: str, data: dict = None) -> dict:
        return self._request("POST", endpoint, data)

    def delete(self, endpoint: str = "") -> dict:
        return self._request("DELETE", endpoint)

    def _request(self, method: str, endpoint: str, data: dict = None) -> dict:
        url = self.url.rstrip("/") + endpoint
        response = self.session.request(method, url, json=data)
        response.raise_for_status()
        return response.json()


[docs]class LivyClient: """A client for sending requests to a Livy server. :param url: The URL of the Livy server. :param auth: A requests-compatible auth object to use when making requests. """ def __init__(self, url: str, auth: Auth = None) -> None: self._client = JsonClient(url, auth) self._server_version_cache: Optional[Version] = None
[docs] def close(self) -> None: """Close the underlying requests session.""" self._client.close()
[docs] def server_version(self) -> Version: """Get the version of Livy running on the server.""" if self._server_version_cache is None: data = self._client.get("/version") self._server_version_cache = Version(data["version"]) return self._server_version_cache
[docs] def legacy_server(self) -> bool: """Determine if the server is running a legacy version. Legacy versions support different session kinds than newer versions of Livy. """ version = self.server_version() return version < Version("0.5.0-incubating")
[docs] def list_sessions(self) -> List[Session]: """List all the active sessions in Livy.""" data = self._client.get("/sessions") return [Session.from_json(item) for item in data["sessions"]]
[docs] def create_session( self, kind: SessionKind, proxy_user: str = None, spark_conf: Dict[str, Any] = None, ) -> Session: """Create a new session in Livy. :param kind: The kind of session to create. :param proxy_user: User to impersonate when starting the session. :param spark_conf: Spark configuration properties. """ if self.legacy_server(): valid_kinds = VALID_LEGACY_SESSION_KINDS else: valid_kinds = VALID_SESSION_KINDS if kind not in valid_kinds: raise ValueError( f"{kind} is not a valid session kind for a Livy server of " f"this version (should be one of {valid_kinds})" ) body = {"kind": kind.value} if proxy_user is not None: body["proxyUser"] = proxy_user if spark_conf is not None: body["conf"] = spark_conf data = self._client.post("/sessions", data=body) return Session.from_json(data)
[docs] def get_session(self, session_id: int) -> Optional[Session]: """Get information about a session. :param session_id: The ID of the session. """ try: data = self._client.get(f"/sessions/{session_id}") except requests.HTTPError as e: if e.response.status_code == 404: return None else: raise return Session.from_json(data)
[docs] def delete_session(self, session_id: int) -> None: """Kill a session. :param session_id: The ID of the session. """ self._client.delete(f"/sessions/{session_id}")
[docs] def list_statements(self, session_id: int) -> List[Statement]: """Get all the statements in a session. :param session_id: The ID of the session. """ response = self._client.get(f"/sessions/{session_id}/statements") return [ Statement.from_json(session_id, data) for data in response["statements"] ]
[docs] def create_statement( self, session_id: int, code: str, kind: StatementKind = None ) -> Statement: """Run a statement in a session. :param session_id: The ID of the session. :param code: The code to execute. :param kind: The kind of code to execute. """ data = {"code": code} if kind is not None: if self.legacy_server(): LOGGER.warning("statement kind ignored on Livy<0.5.0") data["kind"] = kind.value response = self._client.post( f"/sessions/{session_id}/statements", data=data ) return Statement.from_json(session_id, response)
[docs] def get_statement(self, session_id: int, statement_id: int) -> Statement: """Get information about a statement in a session. :param session_id: The ID of the session. :param statement_id: The ID of the statement. """ response = self._client.get( f"/sessions/{session_id}/statements/{statement_id}" ) return Statement.from_json(session_id, response)