Session Management

chDB provides stateful session management for maintaining state across multiple queries, creating temporary tables, views, and executing complex workflows.

Overview

The Session class allows you to:

  • Maintain state between queries (tables, views, databases persist within the session)

  • Use temporary or persistent storage

  • Stream large query results efficiently

  • Execute complex SQL workflows with multiple steps

Session API

class chdb.session.Session(path=None)[source]

Bases: object

Session will keep the state of query. If path is None, it will create a temporary directory and use it as the database path and the temporary directory will be removed when the session is closed. You can also pass in a path to create a database at that path where will keep your data.

You can also use a connection string to pass in the path and other parameters. .. rubric:: Examples

  • “:memory:” (for in-memory database)

  • “test.db” (for relative path)

  • file:test.db” (same as above)

  • “/path/to/test.db” (for absolute path)

  • file:/path/to/test.db” (same as above)

  • file:test.db?param1=value1&param2=value2” (for relative path with query params)

  • file::memory:?verbose&log-level=test” (for in-memory database with query params)

  • “///path/to/test.db?param1=value1&param2=value2” (for absolute path)

Connection string args handling:

Connection string can contain query params like “file:test.db?param1=value1&param2=value2” “param1=value1” will be passed to ClickHouse engine as start up args.

For more details, see clickhouse local –help –verbose Some special args handling: - “mode=ro” would be “–readonly=1” for clickhouse (read-only mode)

Important

  • There can be only one session at a time. If you want to create a new session, you need to close the existing one.

  • Creating a new session will close the existing one.

cleanup()[source]

Cleanup session resources with exception handling.

This method attempts to close the session while suppressing any exceptions that might occur during the cleanup process. It’s particularly useful in error handling scenarios or when you need to ensure cleanup happens regardless of the session state.

Note

This method will never raise an exception, making it safe to call in finally blocks or destructors.

See also

close() - For explicit session closing with error propagation

Examples

>>> session = Session("test.db")
>>> try:
...     session.query("INVALID SQL")
... finally:
...     session.cleanup()  # Safe cleanup regardless of errors
close()[source]

Close the session and cleanup resources.

This method closes the underlying connection and resets the global session state. After calling this method, the session becomes invalid and cannot be used for further queries.

Note

This method is automatically called when the session is used as a context manager or when the session object is destroyed.

Warning

Any attempt to use the session after calling close() will result in an error.

Examples

>>> session = Session("test.db")
>>> session.query("SELECT 1")
>>> session.close()  # Explicitly close the session
query(sql, fmt='CSV', udf_path='')[source]

Execute a SQL query and return the results.

This method executes a SQL query against the session’s database and returns the results in the specified format. The method supports various output formats and maintains session state between queries.

Parameters:
  • sql (str) – SQL query string to execute

  • fmt (str, optional) – Output format for results. Defaults to “CSV”. Available formats include:

    • “CSV” - Comma-separated values

    • “JSON” - JSON format

    • “TabSeparated” - Tab-separated values

    • “Pretty” - Pretty-printed table format

    • “JSONCompact” - Compact JSON format

    • “Arrow” - Apache Arrow format

    • “Parquet” - Parquet format

  • udf_path (str, optional) – Path to user-defined functions. Defaults to “”. If not specified, uses the UDF path from session initialization.

Returns:

Query results in the specified format. The exact return type depends on the format parameter:

  • String formats (CSV, JSON, etc.) return str

  • Binary formats (Arrow, Parquet) return bytes

Raises:
  • RuntimeError – If the session is closed or invalid

  • ValueError – If the SQL query is malformed

Note

The “Debug” format is not supported and will be automatically converted to “CSV” with a warning. For debugging, use connection string parameters instead.

Warning

This method executes the query synchronously and loads all results into memory. For large result sets, consider using send_query() for streaming results.

Examples

>>> session = Session("test.db")
>>>
>>> # Basic query with default CSV format
>>> result = session.query("SELECT 1 as number")
>>> print(result)
number
1
>>> # Query with JSON format
>>> result = session.query("SELECT 1 as number", fmt="JSON")
>>> print(result)
{"number": "1"}
>>> # Complex query with table creation
>>> session.query("CREATE TABLE test (id INT, name String)")
>>> session.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")
>>> result = session.query("SELECT * FROM test ORDER BY id")
>>> print(result)
id,name
1,Alice
2,Bob

See also

send_query() - For streaming query execution sql - Alias for this method

send_query(sql, fmt='CSV') StreamingResult[source]

Execute a SQL query and return a streaming result iterator.

This method executes a SQL query against the session’s database and returns a streaming result object that allows you to iterate over the results without loading everything into memory at once. This is particularly useful for large result sets.

Parameters:
  • sql (str) – SQL query string to execute

  • fmt (str, optional) – Output format for results. Defaults to “CSV”. Available formats include:

    • “CSV” - Comma-separated values

    • “JSON” - JSON format

    • “TabSeparated” - Tab-separated values

    • “JSONCompact” - Compact JSON format

    • “Arrow” - Apache Arrow format

    • “Parquet” - Parquet format

Returns:

StreamingResult – A streaming result iterator that yields query results incrementally. The iterator can be used in for loops or converted to other data structures.

Raises:
  • RuntimeError – If the session is closed or invalid

  • ValueError – If the SQL query is malformed

Note

The “Debug” format is not supported and will be automatically converted to “CSV” with a warning. For debugging, use connection string parameters instead.

Warning

The returned StreamingResult object should be consumed promptly or stored appropriately, as it maintains a connection to the database.

Examples

>>> session = Session("test.db")
>>> session.query("CREATE TABLE big_table (id INT, data String)")
>>>
>>> # Insert large dataset
>>> for i in range(1000):
...     session.query(f"INSERT INTO big_table VALUES ({i}, 'data_{i}')")
>>>
>>> # Stream results to avoid memory issues
>>> streaming_result = session.send_query("SELECT * FROM big_table ORDER BY id")
>>> for chunk in streaming_result:
...     print(f"Processing chunk: {len(chunk)} bytes")
...     # Process chunk without loading entire result set
>>> # Using with context manager
>>> with session.send_query("SELECT COUNT(*) FROM big_table") as stream:
...     for result in stream:
...         print(f"Count result: {result}")

See also

query() - For non-streaming query execution chdb.state.sqlitelike.StreamingResult - Streaming result iterator

sql(sql, fmt='CSV', udf_path='')

Execute a SQL query and return the results.

This method executes a SQL query against the session’s database and returns the results in the specified format. The method supports various output formats and maintains session state between queries.

Parameters:
  • sql (str) – SQL query string to execute

  • fmt (str, optional) – Output format for results. Defaults to “CSV”. Available formats include:

    • “CSV” - Comma-separated values

    • “JSON” - JSON format

    • “TabSeparated” - Tab-separated values

    • “Pretty” - Pretty-printed table format

    • “JSONCompact” - Compact JSON format

    • “Arrow” - Apache Arrow format

    • “Parquet” - Parquet format

  • udf_path (str, optional) – Path to user-defined functions. Defaults to “”. If not specified, uses the UDF path from session initialization.

Returns:

Query results in the specified format. The exact return type depends on the format parameter:

  • String formats (CSV, JSON, etc.) return str

  • Binary formats (Arrow, Parquet) return bytes

Raises:
  • RuntimeError – If the session is closed or invalid

  • ValueError – If the SQL query is malformed

Note

The “Debug” format is not supported and will be automatically converted to “CSV” with a warning. For debugging, use connection string parameters instead.

Warning

This method executes the query synchronously and loads all results into memory. For large result sets, consider using send_query() for streaming results.

Examples

>>> session = Session("test.db")
>>>
>>> # Basic query with default CSV format
>>> result = session.query("SELECT 1 as number")
>>> print(result)
number
1
>>> # Query with JSON format
>>> result = session.query("SELECT 1 as number", fmt="JSON")
>>> print(result)
{"number": "1"}
>>> # Complex query with table creation
>>> session.query("CREATE TABLE test (id INT, name String)")
>>> session.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")
>>> result = session.query("SELECT * FROM test ORDER BY id")
>>> print(result)
id,name
1,Alice
2,Bob

See also

send_query() - For streaming query execution sql - Alias for this method

Basic Usage

Creating and Using Sessions

from chdb import session as chs

# Create a temporary session (auto-cleanup)
sess = chs.Session()

# Execute queries with persistent state
sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic")
sess.query("CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x String, y Int) ENGINE = Log;")
sess.query("INSERT INTO db_xxx.log_table_xxx VALUES ('a', 1), ('b', 3), ('c', 2), ('d', 5);")
sess.query("CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 4;")

print("Select from view:")
print(sess.query("SELECT * FROM db_xxx.view_xxx", "Pretty"))

# Session automatically cleaned up when object is destroyed

Session with File-based Storage

# Create persistent session with file storage
sess = chs.Session("my_database.db")

# Create persistent tables
sess.query("""
    CREATE TABLE users (
        id UInt32,
        name String,
        created_date Date
    ) ENGINE = MergeTree() ORDER BY id
""")

# Insert data
sess.query("INSERT INTO users VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-02')")

# Query data
result = sess.query("SELECT * FROM users ORDER BY id", "JSONEachRow")
print(result)

# Close session (data persists in file)
sess.close()

Connection String Support

Sessions support flexible connection strings for configuration:

# In-memory database
sess = chs.Session(":memory:")

# File-based with relative path
sess = chs.Session("test.db")

# Absolute path
sess = chs.Session("/path/to/database.db")

# URI format with parameters
sess = chs.Session("file:test.db?param1=value1&param2=value2")

# Read-only mode
sess = chs.Session("test.db?mode=ro")

# With verbose logging
sess = chs.Session("test.db?verbose&log-level=debug")

Streaming Queries

For processing large datasets efficiently, use streaming queries that don’t load all results into memory:

Basic Streaming Example

from chdb import session as chs

sess = chs.Session()

# Stream large result set
rows_cnt = 0
with sess.send_query("SELECT * FROM numbers(200000)", "CSV") as stream_result:
    for chunk in stream_result:
        rows_cnt += chunk.rows_read()

print(f"Processed {rows_cnt} rows")  # 200000

Manual Streaming Control

# Manual iteration with fetch()
rows_cnt = 0
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")

while True:
    chunk = stream_result.fetch()
    if chunk is None:
        break
    rows_cnt += chunk.rows_read()

print(f"Processed {rows_cnt} rows")  # 200000

Early Termination

# Early cancellation example
rows_cnt = 0
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")

while True:
    chunk = stream_result.fetch()
    if chunk is None:
        break

    # Process some data then terminate early
    if rows_cnt > 0:
        stream_result.close()  # Important: close to free resources
        break

    rows_cnt += chunk.rows_read()

print(f"Early termination after {rows_cnt} rows")

PyArrow Integration

import pyarrow as pa

# Stream results in Arrow format
stream_result = sess.send_query("SELECT * FROM numbers(100000)", "Arrow")

# Create RecordBatchReader with custom batch size
batch_reader = stream_result.record_batch(rows_per_batch=10000)

# Use with external libraries (example: Delta Lake)
# from deltalake import write_deltalake
# write_deltalake("./my_delta_table", data=batch_reader, mode="overwrite")

# Process batches manually
for batch in batch_reader:
    print(f"Batch shape: {batch.num_rows} rows, {batch.num_columns} columns")

stream_result.close()

Context Manager Support

Sessions support context manager protocol for automatic cleanup:

# Automatic cleanup with context manager
with chs.Session("temp_session.db") as sess:
    sess.query("CREATE TABLE test (id Int32, name String)")
    sess.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")

    result = sess.query("SELECT * FROM test", "Pretty")
    print(result)

# Session automatically closed and resources cleaned up

Advanced Usage

Complex Analytical Workflows

sess = chs.Session("analytics.db")

# Create analytical tables
sess.query("""
    CREATE TABLE sales (
        date Date,
        product String,
        revenue Decimal(10,2),
        quantity UInt32
    ) ENGINE = MergeTree() ORDER BY date
""")

# Load data (example with CSV file)
sess.query("""
    INSERT INTO sales
    SELECT * FROM file('sales_data.csv', 'CSV',
                      'date Date, product String, revenue Decimal(10,2), quantity UInt32')
""")

# Create analytical views
sess.query("""
    CREATE VIEW monthly_sales AS
    SELECT
        toYYYYMM(date) as month,
        product,
        sum(revenue) as total_revenue,
        sum(quantity) as total_quantity
    FROM sales
    GROUP BY month, product
""")

# Run analysis
result = sess.query("""
    SELECT
        month,
        product,
        total_revenue,
        total_revenue / lag(total_revenue) OVER (PARTITION BY product ORDER BY month) - 1 as growth_rate
    FROM monthly_sales
    ORDER BY month DESC, total_revenue DESC
""", "JSONEachRow")

print("Monthly growth analysis:")
print(result)

Working with Multiple Databases

sess = chs.Session("multi_db.chdb")

# Create multiple databases
sess.query("CREATE DATABASE sales ENGINE = Atomic")
sess.query("CREATE DATABASE analytics ENGINE = Atomic")

# Create tables in different databases
sess.query("""
    CREATE TABLE sales.transactions (
        id UInt32,
        customer_id UInt32,
        amount Decimal(10,2),
        timestamp DateTime
    ) ENGINE = MergeTree() ORDER BY timestamp
""")

sess.query("""
    CREATE TABLE analytics.daily_summary AS
    SELECT
        toDate(timestamp) as date,
        count(*) as transaction_count,
        sum(amount) as total_amount
    FROM sales.transactions
    GROUP BY date
""")

Error Handling

Robust Session Management

import chdb

def safe_session_query(session, sql, fmt="CSV"):
    """Execute session query with proper error handling"""
    try:
        result = session.query(sql, fmt)
        return result, None
    except Exception as e:
        return None, str(e)

# Example usage
try:
    sess = chs.Session("test.db")

    # Test table creation
    result, error = safe_session_query(sess,
        "CREATE TABLE test (id Int32, name String)")

    if error:
        print(f"Table creation failed: {error}")
    else:
        print("Table created successfully")

    # Test data insertion
    result, error = safe_session_query(sess,
        "INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")

    if error:
        print(f"Data insertion failed: {error}")
    else:
        # Query data
        result, error = safe_session_query(sess,
            "SELECT * FROM test", "Pretty")
        if not error:
            print("Query results:")
            print(result)

finally:
    if 'sess' in locals():
        sess.close()

Best Practices

  1. Resource Management: Always close sessions when done, or use context managers

  2. Memory Usage: Use streaming queries for large datasets

  3. Persistence: Choose appropriate storage (memory vs file) based on needs

  4. Error Handling: Wrap session operations in try-catch blocks

  5. Connection Strings: Use connection string parameters for configuration

Note

  • Only one session can be active at a time per process

  • Creating a new session will automatically close any existing session

  • Temporary sessions are automatically cleaned up when the session object is destroyed

  • File-based sessions persist data across Python interpreter restarts

Warning

  • Always call StreamingResult.close() when terminating streaming queries early

  • Large result sets should use streaming queries to avoid memory issues

  • Session state is not shared between different Python processes

See Also