Source code for chdb.dbapi.connections

from . import err
from .cursors import Cursor
from . import converters
from ..state import sqlitelike as chdb_stateful

DEBUG = False
VERBOSE = False


[docs] class Connection(object): """DB-API 2.0 compliant connection to chDB database. This class provides a standard DB-API interface for connecting to and interacting with chDB databases. It supports both in-memory and file-based databases. The connection manages the underlying chDB engine and provides methods for executing queries, managing transactions (no-op for ClickHouse), and creating cursors. Args: path (str, optional): Database file path. If None, uses in-memory database. Can be a file path like 'database.db' or None for ':memory:' Attributes: encoding (str): Character encoding for queries, defaults to 'utf8' open (bool): True if connection is open, False if closed Examples: >>> # In-memory database >>> conn = Connection() >>> cursor = conn.cursor() >>> cursor.execute("SELECT 1") >>> result = cursor.fetchall() >>> conn.close() >>> # File-based database >>> conn = Connection('mydata.db') >>> with conn.cursor() as cur: ... cur.execute("CREATE TABLE users (id INT, name STRING)") ... cur.execute("INSERT INTO users VALUES (1, 'Alice')") >>> conn.close() >>> # Context manager usage >>> with Connection() as cur: ... cur.execute("SELECT version()") ... version = cur.fetchone() Note: ClickHouse does not support traditional transactions, so commit() and rollback() operations are no-ops but provided for DB-API compliance. """ def __init__(self, path=None): """Initialize a new database connection. Args: path (str, optional): Database file path. None for in-memory database. Raises: err.Error: If connection cannot be established """ self._closed = False self.encoding = "utf8" self._affected_rows = 0 self._resp = None # Initialize sqlitelike connection connection_string = ":memory:" if path is None else f"file:{path}" self._conn = chdb_stateful.Connection(connection_string) # Test connection with a simple query cursor = self._conn.cursor() cursor.execute("SELECT 1") cursor.close()
[docs] def close(self): """Close the database connection. Closes the underlying chDB connection and marks this connection as closed. Subsequent operations on this connection will raise an Error. Raises: err.Error: If connection is already closed """ if self._closed: raise err.Error("Already closed") self._closed = True self._conn.close()
@property def open(self): """Check if the connection is open. Returns: bool: True if connection is open, False if closed """ return not self._closed
[docs] def commit(self): """Commit the current transaction. Note: This is a no-op for chDB/ClickHouse as it doesn't support traditional transactions. Provided for DB-API 2.0 compliance. """ # No-op for ClickHouse pass
[docs] def rollback(self): """Roll back the current transaction. Note: This is a no-op for chDB/ClickHouse as it doesn't support traditional transactions. Provided for DB-API 2.0 compliance. """ # No-op for ClickHouse pass
[docs] def cursor(self, cursor=None): """Create a new cursor for executing queries. Args: cursor: Ignored, provided for compatibility Returns: Cursor: New cursor object for this connection Raises: err.Error: If connection is closed Example: >>> conn = Connection() >>> cur = conn.cursor() >>> cur.execute("SELECT 1") >>> result = cur.fetchone() """ if self._closed: raise err.Error("Connection closed") if cursor: return Cursor(self) return Cursor(self)
[docs] def query(self, sql, fmt="CSV"): """Execute a SQL query directly and return raw results. This method bypasses the cursor interface and executes queries directly. For standard DB-API usage, prefer using cursor() method. Args: sql (str or bytes): SQL query to execute fmt (str, optional): Output format. Defaults to "CSV". Supported formats include "CSV", "JSON", "Arrow", "Parquet", etc. Returns: Query result in the specified format Raises: err.InterfaceError: If connection is closed or query fails Example: >>> conn = Connection() >>> result = conn.query("SELECT 1, 'hello'", "CSV") >>> print(result) "1,hello\\n" """ if self._closed: raise err.InterfaceError("Connection closed") if isinstance(sql, str): sql = sql.encode(self.encoding, "surrogateescape") try: result = self._conn.query(sql.decode(), fmt) self._resp = result return result except Exception as error: raise err.InterfaceError(f"Query error: {error}")
[docs] def escape(self, obj, mapping=None): """Escape a value for safe inclusion in SQL queries. Args: obj: Value to escape (string, bytes, number, etc.) mapping: Optional character mapping for escaping Returns: Escaped version of the input suitable for SQL queries Example: >>> conn = Connection() >>> safe_value = conn.escape("O'Reilly") >>> query = f"SELECT * FROM users WHERE name = {safe_value}" """ return converters.escape_item(obj, mapping)
[docs] def escape_string(self, s): """Escape a string value for SQL queries. Args: s (str): String to escape Returns: str: Escaped string safe for SQL inclusion """ return converters.escape_string(s)
def _quote_bytes(self, s): """Quote and escape bytes data for SQL queries. Args: s (bytes): Bytes data to quote Returns: str: Quoted and escaped bytes representation """ return converters.escape_bytes(s) def __enter__(self): """Enter context manager and return a cursor. Returns: Cursor: New cursor for this connection Example: >>> with Connection() as cur: ... cur.execute("SELECT 1") ... result = cur.fetchone() """ return self.cursor() def __exit__(self, exc, value, traceback): """Exit context manager with proper cleanup. Commits on successful exit, rolls back on exception, and always closes connection. Args: exc: Exception type (if any) value: Exception value (if any) traceback: Exception traceback (if any) """ if exc: self.rollback() else: self.commit() self.close() @property def resp(self): """Get the last query response. Returns: The raw response from the last query() call Note: This property is updated each time query() is called directly. It does not reflect queries executed through cursors. """ return self._resp