Session Management¶
chDB provides stateful session management for maintaining state across multiple queries, creating temporary tables, views, and executing complex workflows.
Overview¶
The Session class allows you to:
Maintain state between queries (tables, views, databases persist within the session)
Use temporary or persistent storage
Stream large query results efficiently
Execute complex SQL workflows with multiple steps
Session API¶
- class chdb.session.Session(path=None)[source]¶
Bases:
objectSession will keep the state of query. If path is None, it will create a temporary directory and use it as the database path and the temporary directory will be removed when the session is closed. You can also pass in a path to create a database at that path where will keep your data.
You can also use a connection string to pass in the path and other parameters. .. rubric:: Examples
“:memory:” (for in-memory database)
“test.db” (for relative path)
“file:test.db” (same as above)
“/path/to/test.db” (for absolute path)
“file:/path/to/test.db” (same as above)
“file:test.db?param1=value1¶m2=value2” (for relative path with query params)
“file::memory:?verbose&log-level=test” (for in-memory database with query params)
“///path/to/test.db?param1=value1¶m2=value2” (for absolute path)
- Connection string args handling:
Connection string can contain query params like “file:test.db?param1=value1¶m2=value2” “param1=value1” will be passed to ClickHouse engine as start up args.
For more details, see clickhouse local –help –verbose Some special args handling: - “mode=ro” would be “–readonly=1” for clickhouse (read-only mode)
Important
There can be only one session at a time. If you want to create a new session, you need to close the existing one.
Creating a new session will close the existing one.
- cleanup()[source]¶
Cleanup session resources with exception handling.
This method attempts to close the session while suppressing any exceptions that might occur during the cleanup process. It’s particularly useful in error handling scenarios or when you need to ensure cleanup happens regardless of the session state.
Note
This method will never raise an exception, making it safe to call in finally blocks or destructors.
See also
close()- For explicit session closing with error propagationExamples
>>> session = Session("test.db") >>> try: ... session.query("INVALID SQL") ... finally: ... session.cleanup() # Safe cleanup regardless of errors
- close()[source]¶
Close the session and cleanup resources.
This method closes the underlying connection and resets the global session state. After calling this method, the session becomes invalid and cannot be used for further queries.
Note
This method is automatically called when the session is used as a context manager or when the session object is destroyed.
Warning
Any attempt to use the session after calling close() will result in an error.
Examples
>>> session = Session("test.db") >>> session.query("SELECT 1") >>> session.close() # Explicitly close the session
- query(sql, fmt='CSV', udf_path='')[source]¶
Execute a SQL query and return the results.
This method executes a SQL query against the session’s database and returns the results in the specified format. The method supports various output formats and maintains session state between queries.
- Parameters:
sql (str) – SQL query string to execute
fmt (str, optional) – Output format for results. Defaults to “CSV”. Available formats include:
“CSV” - Comma-separated values
“JSON” - JSON format
“TabSeparated” - Tab-separated values
“Pretty” - Pretty-printed table format
“JSONCompact” - Compact JSON format
“Arrow” - Apache Arrow format
“Parquet” - Parquet format
udf_path (str, optional) – Path to user-defined functions. Defaults to “”. If not specified, uses the UDF path from session initialization.
- Returns:
Query results in the specified format. The exact return type depends on the format parameter:
String formats (CSV, JSON, etc.) return str
Binary formats (Arrow, Parquet) return bytes
- Raises:
RuntimeError – If the session is closed or invalid
ValueError – If the SQL query is malformed
Note
The “Debug” format is not supported and will be automatically converted to “CSV” with a warning. For debugging, use connection string parameters instead.
Warning
This method executes the query synchronously and loads all results into memory. For large result sets, consider using
send_query()for streaming results.Examples
>>> session = Session("test.db") >>> >>> # Basic query with default CSV format >>> result = session.query("SELECT 1 as number") >>> print(result) number 1
>>> # Query with JSON format >>> result = session.query("SELECT 1 as number", fmt="JSON") >>> print(result) {"number": "1"}
>>> # Complex query with table creation >>> session.query("CREATE TABLE test (id INT, name String)") >>> session.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')") >>> result = session.query("SELECT * FROM test ORDER BY id") >>> print(result) id,name 1,Alice 2,Bob
See also
send_query()- For streaming query executionsql- Alias for this method
- send_query(sql, fmt='CSV') StreamingResult[source]¶
Execute a SQL query and return a streaming result iterator.
This method executes a SQL query against the session’s database and returns a streaming result object that allows you to iterate over the results without loading everything into memory at once. This is particularly useful for large result sets.
- Parameters:
sql (str) – SQL query string to execute
fmt (str, optional) – Output format for results. Defaults to “CSV”. Available formats include:
“CSV” - Comma-separated values
“JSON” - JSON format
“TabSeparated” - Tab-separated values
“JSONCompact” - Compact JSON format
“Arrow” - Apache Arrow format
“Parquet” - Parquet format
- Returns:
StreamingResult – A streaming result iterator that yields query results incrementally. The iterator can be used in for loops or converted to other data structures.
- Raises:
RuntimeError – If the session is closed or invalid
ValueError – If the SQL query is malformed
Note
The “Debug” format is not supported and will be automatically converted to “CSV” with a warning. For debugging, use connection string parameters instead.
Warning
The returned StreamingResult object should be consumed promptly or stored appropriately, as it maintains a connection to the database.
Examples
>>> session = Session("test.db") >>> session.query("CREATE TABLE big_table (id INT, data String)") >>> >>> # Insert large dataset >>> for i in range(1000): ... session.query(f"INSERT INTO big_table VALUES ({i}, 'data_{i}')") >>> >>> # Stream results to avoid memory issues >>> streaming_result = session.send_query("SELECT * FROM big_table ORDER BY id") >>> for chunk in streaming_result: ... print(f"Processing chunk: {len(chunk)} bytes") ... # Process chunk without loading entire result set
>>> # Using with context manager >>> with session.send_query("SELECT COUNT(*) FROM big_table") as stream: ... for result in stream: ... print(f"Count result: {result}")
See also
query()- For non-streaming query executionchdb.state.sqlitelike.StreamingResult- Streaming result iterator
- sql(sql, fmt='CSV', udf_path='')¶
Execute a SQL query and return the results.
This method executes a SQL query against the session’s database and returns the results in the specified format. The method supports various output formats and maintains session state between queries.
- Parameters:
sql (str) – SQL query string to execute
fmt (str, optional) – Output format for results. Defaults to “CSV”. Available formats include:
“CSV” - Comma-separated values
“JSON” - JSON format
“TabSeparated” - Tab-separated values
“Pretty” - Pretty-printed table format
“JSONCompact” - Compact JSON format
“Arrow” - Apache Arrow format
“Parquet” - Parquet format
udf_path (str, optional) – Path to user-defined functions. Defaults to “”. If not specified, uses the UDF path from session initialization.
- Returns:
Query results in the specified format. The exact return type depends on the format parameter:
String formats (CSV, JSON, etc.) return str
Binary formats (Arrow, Parquet) return bytes
- Raises:
RuntimeError – If the session is closed or invalid
ValueError – If the SQL query is malformed
Note
The “Debug” format is not supported and will be automatically converted to “CSV” with a warning. For debugging, use connection string parameters instead.
Warning
This method executes the query synchronously and loads all results into memory. For large result sets, consider using
send_query()for streaming results.Examples
>>> session = Session("test.db") >>> >>> # Basic query with default CSV format >>> result = session.query("SELECT 1 as number") >>> print(result) number 1
>>> # Query with JSON format >>> result = session.query("SELECT 1 as number", fmt="JSON") >>> print(result) {"number": "1"}
>>> # Complex query with table creation >>> session.query("CREATE TABLE test (id INT, name String)") >>> session.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')") >>> result = session.query("SELECT * FROM test ORDER BY id") >>> print(result) id,name 1,Alice 2,Bob
See also
send_query()- For streaming query executionsql- Alias for this method
Basic Usage¶
Creating and Using Sessions
from chdb import session as chs
# Create a temporary session (auto-cleanup)
sess = chs.Session()
# Execute queries with persistent state
sess.query("CREATE DATABASE IF NOT EXISTS db_xxx ENGINE = Atomic")
sess.query("CREATE TABLE IF NOT EXISTS db_xxx.log_table_xxx (x String, y Int) ENGINE = Log;")
sess.query("INSERT INTO db_xxx.log_table_xxx VALUES ('a', 1), ('b', 3), ('c', 2), ('d', 5);")
sess.query("CREATE VIEW db_xxx.view_xxx AS SELECT * FROM db_xxx.log_table_xxx LIMIT 4;")
print("Select from view:")
print(sess.query("SELECT * FROM db_xxx.view_xxx", "Pretty"))
# Session automatically cleaned up when object is destroyed
Session with File-based Storage
# Create persistent session with file storage
sess = chs.Session("my_database.db")
# Create persistent tables
sess.query("""
CREATE TABLE users (
id UInt32,
name String,
created_date Date
) ENGINE = MergeTree() ORDER BY id
""")
# Insert data
sess.query("INSERT INTO users VALUES (1, 'Alice', '2024-01-01'), (2, 'Bob', '2024-01-02')")
# Query data
result = sess.query("SELECT * FROM users ORDER BY id", "JSONEachRow")
print(result)
# Close session (data persists in file)
sess.close()
Connection String Support¶
Sessions support flexible connection strings for configuration:
# In-memory database
sess = chs.Session(":memory:")
# File-based with relative path
sess = chs.Session("test.db")
# Absolute path
sess = chs.Session("/path/to/database.db")
# URI format with parameters
sess = chs.Session("file:test.db?param1=value1¶m2=value2")
# Read-only mode
sess = chs.Session("test.db?mode=ro")
# With verbose logging
sess = chs.Session("test.db?verbose&log-level=debug")
Streaming Queries¶
For processing large datasets efficiently, use streaming queries that don’t load all results into memory:
Basic Streaming Example
from chdb import session as chs
sess = chs.Session()
# Stream large result set
rows_cnt = 0
with sess.send_query("SELECT * FROM numbers(200000)", "CSV") as stream_result:
for chunk in stream_result:
rows_cnt += chunk.rows_read()
print(f"Processed {rows_cnt} rows") # 200000
Manual Streaming Control
# Manual iteration with fetch()
rows_cnt = 0
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
while True:
chunk = stream_result.fetch()
if chunk is None:
break
rows_cnt += chunk.rows_read()
print(f"Processed {rows_cnt} rows") # 200000
Early Termination
# Early cancellation example
rows_cnt = 0
stream_result = sess.send_query("SELECT * FROM numbers(200000)", "CSV")
while True:
chunk = stream_result.fetch()
if chunk is None:
break
# Process some data then terminate early
if rows_cnt > 0:
stream_result.close() # Important: close to free resources
break
rows_cnt += chunk.rows_read()
print(f"Early termination after {rows_cnt} rows")
PyArrow Integration
import pyarrow as pa
# Stream results in Arrow format
stream_result = sess.send_query("SELECT * FROM numbers(100000)", "Arrow")
# Create RecordBatchReader with custom batch size
batch_reader = stream_result.record_batch(rows_per_batch=10000)
# Use with external libraries (example: Delta Lake)
# from deltalake import write_deltalake
# write_deltalake("./my_delta_table", data=batch_reader, mode="overwrite")
# Process batches manually
for batch in batch_reader:
print(f"Batch shape: {batch.num_rows} rows, {batch.num_columns} columns")
stream_result.close()
Context Manager Support¶
Sessions support context manager protocol for automatic cleanup:
# Automatic cleanup with context manager
with chs.Session("temp_session.db") as sess:
sess.query("CREATE TABLE test (id Int32, name String)")
sess.query("INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")
result = sess.query("SELECT * FROM test", "Pretty")
print(result)
# Session automatically closed and resources cleaned up
Advanced Usage¶
Complex Analytical Workflows
sess = chs.Session("analytics.db")
# Create analytical tables
sess.query("""
CREATE TABLE sales (
date Date,
product String,
revenue Decimal(10,2),
quantity UInt32
) ENGINE = MergeTree() ORDER BY date
""")
# Load data (example with CSV file)
sess.query("""
INSERT INTO sales
SELECT * FROM file('sales_data.csv', 'CSV',
'date Date, product String, revenue Decimal(10,2), quantity UInt32')
""")
# Create analytical views
sess.query("""
CREATE VIEW monthly_sales AS
SELECT
toYYYYMM(date) as month,
product,
sum(revenue) as total_revenue,
sum(quantity) as total_quantity
FROM sales
GROUP BY month, product
""")
# Run analysis
result = sess.query("""
SELECT
month,
product,
total_revenue,
total_revenue / lag(total_revenue) OVER (PARTITION BY product ORDER BY month) - 1 as growth_rate
FROM monthly_sales
ORDER BY month DESC, total_revenue DESC
""", "JSONEachRow")
print("Monthly growth analysis:")
print(result)
Working with Multiple Databases
sess = chs.Session("multi_db.chdb")
# Create multiple databases
sess.query("CREATE DATABASE sales ENGINE = Atomic")
sess.query("CREATE DATABASE analytics ENGINE = Atomic")
# Create tables in different databases
sess.query("""
CREATE TABLE sales.transactions (
id UInt32,
customer_id UInt32,
amount Decimal(10,2),
timestamp DateTime
) ENGINE = MergeTree() ORDER BY timestamp
""")
sess.query("""
CREATE TABLE analytics.daily_summary AS
SELECT
toDate(timestamp) as date,
count(*) as transaction_count,
sum(amount) as total_amount
FROM sales.transactions
GROUP BY date
""")
Error Handling¶
Robust Session Management
import chdb
def safe_session_query(session, sql, fmt="CSV"):
"""Execute session query with proper error handling"""
try:
result = session.query(sql, fmt)
return result, None
except Exception as e:
return None, str(e)
# Example usage
try:
sess = chs.Session("test.db")
# Test table creation
result, error = safe_session_query(sess,
"CREATE TABLE test (id Int32, name String)")
if error:
print(f"Table creation failed: {error}")
else:
print("Table created successfully")
# Test data insertion
result, error = safe_session_query(sess,
"INSERT INTO test VALUES (1, 'Alice'), (2, 'Bob')")
if error:
print(f"Data insertion failed: {error}")
else:
# Query data
result, error = safe_session_query(sess,
"SELECT * FROM test", "Pretty")
if not error:
print("Query results:")
print(result)
finally:
if 'sess' in locals():
sess.close()
Best Practices¶
Resource Management: Always close sessions when done, or use context managers
Memory Usage: Use streaming queries for large datasets
Persistence: Choose appropriate storage (memory vs file) based on needs
Error Handling: Wrap session operations in try-catch blocks
Connection Strings: Use connection string parameters for configuration
Note
Only one session can be active at a time per process
Creating a new session will automatically close any existing session
Temporary sessions are automatically cleaned up when the session object is destroyed
File-based sessions persist data across Python interpreter restarts
Warning
Always call
StreamingResult.close()when terminating streaming queries earlyLarge result sets should use streaming queries to avoid memory issues
Session state is not shared between different Python processes
See Also¶
DB-API 2.0 Interface - Database API for connection-based queries
Examples - More comprehensive usage examples
API Reference - Complete API reference