Documentation

Extend plugins with API features and state management

The Processing Engine includes a shared API that your plugins can use to interact with data, write new records in line protocol format, and maintain state between executions. These capabilities let you build plugins that transform, analyze, and respond to time series data as it flows through your database.

The plugin API lets you:

Get started with the shared API

Each plugin automatically has access to the shared API through the influxdb3_local object. You don’t need to import any libraries. The API becomes available as soon as your plugin runs.

Write data

To write data into your database, use the LineBuilder API to create line protocol data:

# Create a line protocol entry
line = LineBuilder("weather")
line.tag("location", "us-midwest")
line.float64_field("temperature", 82.5)
line.time_ns(1627680000000000000)

# Write the data to the database
influxdb3_local.write(line)

InfluxDB 3 buffers your writes while the plugin runs and flushes them when the plugin completes.

View the LineBuilder Python implementation

Query data

Your plugins can execute SQL queries and process results directly:

# Simple query
results = influxdb3_local.query("SELECT * FROM metrics WHERE time > now() - INTERVAL '1 hour'")

# Parameterized query for safer execution
params = {"table": "metrics", "threshold": 90}
results = influxdb3_local.query("SELECT * FROM $table WHERE value > $threshold", params)

Query results are a List of Dict[String, Any], where each dictionary represents a row. Column names are keys, and column values are the corresponding values.

Log messages for monitoring and debugging

Use the shared API’s info, warn, and error functions to log messages from your plugin. Each function accepts one or more arguments, converts them to strings, and logs them as a space-separated message.

Add logging to monitor plugin execution and assist with debugging:

influxdb3_local.info("Starting data processing")
influxdb3_local.warn("Could not process some records")
influxdb3_local.error("Failed to connect to external API")

# Log structured data
obj_to_log = {"records": 157, "errors": 3}
influxdb3_local.info("Processing complete", obj_to_log)

The system writes all log messages to the server logs and stores them in system tables, where you can query them using SQL.

Maintain state with the in-memory cache

The Processing Engine provides an in-memory cache that enables your plugins to persist and retrieve data between executions.

Access the cache using the cache property of the shared API:

# Basic usage pattern  
influxdb3_local.cache.METHOD(PARAMETERS)

cache provides the following methods to retrieve and manage cached values:

MethodParametersReturnsDescription
putkey (str): The key to store the value under
value (Any): Any Python object to cache
ttl (Optional[float], default=None): Time in seconds before expiration
use_global (bool, default=False): If True, uses global namespace
NoneStores a value in the cache with an optional time-to-live
getkey (str): The key to retrieve
default (Any, default=None): Value to return if key not found
use_global (bool, default=False): If True, uses global namespace
AnyRetrieves a value from the cache or returns default if not found
deletekey (str): The key to delete
use_global (bool, default=False): If True, uses global namespace
boolDeletes a value from the cache. Returns True if deleted, False if not found

Understanding cache namespaces

The cache system offers two distinct namespaces:

NamespaceScopeBest For
Trigger-specific (default)Isolated to a single triggerPlugin state, counters, timestamps specific to one plugin
GlobalShared across all triggersConfiguration, lookup tables, service states that should be available to all plugins

Common cache operations

Store and retrieve cached data

# Store a value
influxdb3_local.cache.put("last_run_time", time.time())

# Retrieve a value with a default if not found
last_time = influxdb3_local.cache.get("last_run_time", default=0)

# Delete a cached value
influxdb3_local.cache.delete("temporary_data")

Store cached data with expiration

# Cache with a 5-minute TTL (time-to-live)
influxdb3_local.cache.put("api_response", response_data, ttl=300)

Share data across plugins

# Store in the global namespace
influxdb3_local.cache.put("config", {"version": "1.0"}, use_global=True)

# Retrieve from the global namespace
config = influxdb3_local.cache.get("config", use_global=True)

Building a counter

You can track how many times a plugin has run:

# Get current counter or default to 0
counter = influxdb3_local.cache.get("execution_count", default=0)

# Increment counter
counter += 1

# Store the updated value
influxdb3_local.cache.put("execution_count", counter)

influxdb3_local.info(f"This plugin has run {counter} times")

Guidelines for in-memory caching

To get the most out of the in-memory cache, follow these guidelines:

Use the trigger-specific namespace

The Processing Engine provides a cache that supports stateful operations while maintaining isolation between different triggers. For most use cases, use the trigger-specific namespace to keep plugin state isolated. Use the global namespace only when you need to share data across triggers.

Use TTL appropriately

Set appropriate expiration times based on how frequently your data changes:

# Cache external API responses for 5 minutes  
influxdb3_local.cache.put("weather_data", api_response, ttl=300)

Cache computation results

Store the results of expensive calculations that you frequently utilize:

# Cache aggregated statistics  
influxdb3_local.cache.put("daily_stats", calculate_statistics(data), ttl=3600)

Warm the cache

For critical data, prime the cache at startup. This can be especially useful for global namespace data where multiple triggers need the data:

# Check if cache needs to be initialized  
if not influxdb3_local.cache.get("lookup_table"):   
    influxdb3_local.cache.put("lookup_table", load_lookup_data())

Consider cache limitations

  • Memory Usage: Since the system stores cache contents in memory, monitor your memory usage when caching large datasets.
  • Server Restarts: Because the server clears the cache on restart, design your plugins to handle cache initialization (as noted above).
  • Concurrency: Be cautious of accessing inaccurate or out-of-date data when multiple trigger instances might simultaneously update the same cache key.

Next Steps

With an understanding of the InfluxDB 3 Shared Plugin API, you can start building data workflows that transform, analyze, and respond to your time series data.

To find example plugins you can extend, visit the influxdb3_plugins repository on GitHub.


Was this page helpful?

Thank you for your feedback!


The future of Flux

Flux is going into maintenance mode. You can continue using it as you currently are without any changes to your code.

Read more

Now Generally Available

InfluxDB 3 Core and Enterprise

Start fast. Scale faster.

Get the Updates

InfluxDB 3 Core is an open source, high-speed, recent-data engine that collects and processes data in real-time and persists it to local disk or object storage. InfluxDB 3 Enterprise builds on Core’s foundation, adding high availability, read replicas, enhanced security, and data compaction for faster queries and optimized storage. A free tier of InfluxDB 3 Enterprise is available for non-commercial at-home or hobbyist use.

For more information, check out:

OSZAR »