Extend plugins with API features and state management
The Processing Engine includes a shared API that your plugins can use to interact with data, write new records in line protocol format, and maintain state between executions. These capabilities let you build plugins that transform, analyze, and respond to time series data as it flows through your database.
The plugin API lets you:
- Write data
- Query data
- Log messages for monitoring and debugging
- Maintain state with the in-memory cache
- Guidelines for in-memory caching
Get started with the shared API
Each plugin automatically has access to the shared API through the influxdb3_local
object. You don’t need to import any libraries. The API becomes available as soon as your plugin runs.
Write data
To write data into your database, use the LineBuilder
API to create line protocol data:
# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)
# Write the data to the database
influxdb3_local.write(line)
InfluxDB 3 buffers your writes while the plugin runs and flushes them when the plugin completes.
Query data
Your plugins can execute SQL queries and process results directly:
# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")
# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)
Query results are a List
of Dict[String, Any]
, where each dictionary represents a row. Column names are keys, and column values are the corresponding values.
Log messages for monitoring and debugging
Use the shared API’s info
, warn
, and error
functions to log messages from your plugin. Each function accepts one or more arguments, converts them to strings, and logs them as a space-separated message.
Add logging to monitor plugin execution and assist with debugging:
influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")
# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)
The system writes all log messages to the server logs and stores them in system tables, where you can query them using SQL.
Maintain state with the in-memory cache
The Processing Engine provides an in-memory cache that enables your plugins to persist and retrieve data between executions.
Access the cache using the cache
property of the shared API:
# Basic usage pattern
influxdb3_local.cache.METHOD(PARAMETERS)
cache
provides the following methods to retrieve and manage cached values:
Method | Parameters | Returns | Description |
---|---|---|---|
put | key (str): The key to store the value undervalue (Any): Any Python object to cachettl (Optional[float], default=None): Time in seconds before expirationuse_global (bool, default=False): If True, uses global namespace | None | Stores a value in the cache with an optional time-to-live |
get | key (str): The key to retrievedefault (Any, default=None): Value to return if key not founduse_global (bool, default=False): If True, uses global namespace | Any | Retrieves a value from the cache or returns default if not found |
delete | key (str): The key to deleteuse_global (bool, default=False): If True, uses global namespace | bool | Deletes a value from the cache. Returns True if deleted, False if not found |
Understanding cache namespaces
The cache system offers two distinct namespaces:
Namespace | Scope | Best For |
---|---|---|
Trigger-specific (default) | Isolated to a single trigger | Plugin state, counters, timestamps specific to one plugin |
Global | Shared across all triggers | Configuration, lookup tables, service states that should be available to all plugins |
Common cache operations
- Store and retrieve cached data
- Store cached data with expiration
- Share data across plugins
- Build a counter
Store and retrieve cached data
# Store a value
influxdb3_local.cache.put("last_run_time", time.time())
# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)
# Delete a cached value
influxdb3_local.cache.delete("temporary_data")
Store cached data with expiration
# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)
Share data across plugins
# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)
# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)
Building a counter
You can track how many times a plugin has run:
# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)
# Increment counter
counter += 1
# Store the updated value
influxdb3_local.cache.put("execution_count", counter)
influxdb3_local.info(f"This plugin has run {counter} times")
Guidelines for in-memory caching
To get the most out of the in-memory cache, follow these guidelines:
- Use the trigger-specific namespace
- Use TTL appropriately
- Cache computation results
- Warm the cache
- Consider cache limitations
Use the trigger-specific namespace
The Processing Engine provides a cache that supports stateful operations while maintaining isolation between different triggers. For most use cases, use the trigger-specific namespace to keep plugin state isolated. Use the global namespace only when you need to share data across triggers.
Use TTL appropriately
Set appropriate expiration times based on how frequently your data changes:
# Cache external API responses for 5 minutes
influxdb3_local.cache.put("weather_data", api_response, ttl=300)
Cache computation results
Store the results of expensive calculations that you frequently utilize:
# Cache aggregated statistics
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)
Warm the cache
For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:
# Check if cache needs to be initialized
if not influxdb3_local.cache.get("lookup_table"):
influxdb3_local.cache.put("lookup_table", load_lookup_data())
Consider cache limitations
- Memory Usage: Since the system stores cache contents in memory, monitor your memory usage when caching large datasets.
- Server Restarts: Because the server clears the cache on restart, design your plugins to handle cache initialization (as noted above).
- Concurrency: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.
Next Steps
With an understanding of the InfluxDB 3 Shared Plugin API, you can start building data workflows that transform, analyze, and respond to your time series data.
To find example plugins you can extend, visit the influxdb3_plugins repository on GitHub.
Was this page helpful?
Thank you for your feedback!
Support and feedback
Thank you for being part of our community! We welcome and encourage your feedback and bug reports for InfluxDB 3 Core and this documentation. To find support, use the following resources:
Customers with an annual or support contract can contact InfluxData Support.