Configuration Reference

There are various configuration parameters that affect the behaviour of Canopy. These include both standard LightDB parameters and Canopy specific parameters. To learn more about LightDB configuration parameters, you can visit the run time configuration section of LightDB documentation.

The rest of this reference aims at discussing Canopy specific configuration parameters. These parameters can be set similar to LightDB parameters by modifying lightdb.conf or by using the SET command.

As an example you can update a setting with:

ALTER DATABASE canopy SET canopy.multi_task_query_log_level = 'log';

General configuration

canopy.max_worker_nodes_tracked (integer)

Canopy tracks worker nodes’ locations and their membership in a shared hash table on the coordinator node. This configuration value limits the size of the hash table, and consequently the number of worker nodes that can be tracked. The default for this setting is 2048. This parameter can only be set at server start and is effective on the coordinator node.

canopy.use_secondary_nodes (enum)

Sets the policy to use when choosing nodes for SELECT queries. If this is set to ‘always’, then the planner will query only nodes which are marked as ‘secondary’ noderole in pg_dist_node.

The supported values for this enum are:

  • never: (default) All reads happen on primary nodes.

  • always: Reads run against secondary nodes instead, and insert/update statements are disabled.

canopy.cluster_name (text)

Informs the coordinator node planner which cluster it coordinates. Once cluster_name is set, the planner will query worker nodes in that cluster alone.

canopy.enable_version_checks (boolean)

Upgrading Canopy version requires a server restart (to pick up the new shared-library), as well as running an ALTER EXTENSION UPDATE command. The failure to execute both steps could potentially cause errors or crashes. Canopy thus validates the version of the code and that of the extension match, and errors out if they don’t.

This value defaults to true, and is effective on the coordinator. In rare cases, complex upgrade processes may require setting this parameter to false, thus disabling the check.

canopy.log_distributed_deadlock_detection (boolean)

Whether to log distributed deadlock detection related processing in the server log. It defaults to false.

canopy.distributed_deadlock_detection_factor (floating point)

Sets the time to wait before checking for distributed deadlocks. In particular the time to wait will be this value multiplied by LightDB’s deadlock_timeout setting. The default value is 2. A value of -1 disables distributed deadlock detection.

canopy.node_connection_timeout (integer)

The canopy.node_connection_timeout GUC sets the maximum duration (in milliseconds) to wait for connection establishment. Canopy raises an error if the timeout elapses before at least one worker connection is established. This GUC affects connections from the coordinator to workers, and workers to each other.

  • Default: thirty seconds

  • Minimum: ten milliseconds

  • Maximum: one hour

-- set to 60 seconds
ALTER DATABASE foo
SET canopy.node_connection_timeout = 60000;

canopy.node_conninfo (text)

The canopy.node_conninfo GUC sets non-sensitive libpq connection parameters used for all inter-node connections.

-- key=value pairs separated by spaces.
-- For example, ssl options:

ALTER DATABASE foo
SET canopy.node_conninfo =
  'sslrootcert=/path/to/canopy.crt sslmode=verify-full';

Canopy honors only a specific subset of the allowed options, namely:

  • application_name

  • connect_timeout

  • gsslib†

  • keepalives

  • keepalives_count

  • keepalives_idle

  • keepalives_interval

  • krbsrvname†

  • sslcompression

  • sslcrl

  • sslmode (defaults to “prefer”)

  • sslrootcert

  • tcp_user_timeout

(† = subject to the runtime presence of optional LightDB features)

The node_conninfo setting takes effect only on newly opened connections. To force all connections to use the new settings, make sure to reload the lightdb configuration:

SELECT pg_reload_conf();

canopy.local_hostname (text)

Canopy nodes need occasionally to connect to themselves for systems operations. By default, they use the address localhost to refer to themselves, but this can cause problems. For instance, when a host requires sslmode=verify-full for incoming connections, adding localhost as an alternative hostname on the SSL certificate isn’t always desirable – or even feasible.

canopy.local_hostname selects the hostname a node uses to connect to itself. The default value is localhost.

ALTER SYSTEM SET canopy.local_hostname TO 'mynode.example.com';

canopy.show_shards_for_app_name_prefixes (text)

By default, Canopy hides shards from the list of tables LightDB gives to SQL clients. It does this because there are multiple shards per distributed table, and the shards can be distracting to the SQL client.

The canopy.show_shards_for_app_name_prefixes GUC allows shards to be displayed for selected clients that want to see them. Its default value is ''.

-- show shards to ltsql only (hide in other clients, like pgAdmin)

SET canopy.show_shards_for_app_name_prefixes TO 'ltsql';

-- also accepts a comma separated list

SET canopy.show_shards_for_app_name_prefixes TO 'ltsql,lt_dump';

Query Statistics

canopy.stat_statements_purge_interval (integer)

Sets the frequency at which the maintenance daemon removes records from canopy_stat_statements that are unmatched in lt_stat_statements. This configuration value sets the time interval between purges in seconds, with a default value of 10. A value of 0 disables the purges.

SET canopy.stat_statements_purge_interval TO 5;

This parameter is effective on the coordinator and can be changed at runtime.

canopy.stat_statements_max (integer)

The maximum number of rows to store in canopy_stat_statements. Defaults to 50000, and may be changed to any value in the range 1000 - 10000000. Note that each row requires 140 bytes of storage, so setting stat_statements_max to its maximum value of 10M would consume 1.4GB of memory.

Changing this GUC will not take effect until LightDB is restarted.

canopy.stat_statements_track (enum)

Recording statistics for canopy_stat_statements requires extra CPU resources. When the database is experiencing load, the administrator may wish to disable statement tracking. The canopy.stat_statements_track GUC can turn tracking on and off.

  • all: (default) Track all statements.

  • none: Disable tracking.

Data Loading

canopy.multi_shard_commit_protocol (enum)

Sets the commit protocol to use when performing COPY on a hash distributed table. On each individual shard placement, the COPY is performed in a transaction block to ensure that no data is ingested if an error occurs during the COPY. However, there is a particular failure case in which the COPY succeeds on all placements, but a (hardware) failure occurs before all transactions commit. This parameter can be used to prevent data loss in that case by choosing between the following commit protocols:

  • 2pc: (default) The transactions in which COPY is performed on the shard placements are first prepared using LightDB’s two-phase commit and then committed. Failed commits can be manually recovered or aborted using COMMIT PREPARED or ROLLBACK PREPARED, respectively. When using 2pc, max_prepared_transactions should be increased on all the workers, typically to the same value as max_connections.

  • 1pc: The transactions in which COPY is performed on the shard placements are committed in a single round. Data may be lost if a commit fails after COPY succeeds on all placements (rare).

canopy.shard_count (integer)

Sets the shard count for hash-partitioned tables and defaults to 32. This value is used by the create_distributed_table UDF when creating hash-partitioned tables. This parameter can be set at run-time and is effective on the coordinator.

canopy.shard_max_size (integer)

Sets the maximum size to which a shard will grow before it gets split and defaults to 1GB. When the source file’s size (which is used for staging) for one shard exceeds this configuration value, the database ensures that a new shard gets created. This parameter can be set at run-time and is effective on the coordinator.

canopy.replicate_reference_tables_on_activate (boolean)

Reference table shards must be placed on all nodes which have distributed tables. By default, reference table shards are copied to a node at node activation time, that is, when such functions as canopy_add_node or canopy_activate_node are called. However, node activation might be an inconvenient time to copy the placements, because it can take a long time when there are large reference tables.

You can defer reference table replication by setting the canopy.replicate_reference_tables_on_activate GUC to ‘off’. Reference table replication will then happen when we create new shards on the node. For instance, when calling create_distributed_table, create_reference_table, or when the shard rebalancer moves shards to the new node.

The default value for this GUC is ‘on’.

Planner Configuration

canopy.local_table_join_policy (enum)

This GUC determines how Canopy moves data when doing a join between local and distributed tables. Customizing the join policy can help reduce the amount of data sent between worker nodes.

Canopy will send either the local or distributed tables to nodes as necessary to support the join. Copying table data is referred to as a “conversion.” If a local table is converted, then it will be sent to any workers that need its data to perform the join. If a distributed table is converted, then it will be collected in the coordinator to support the join. The canopy planner will send only the necessary rows doing a conversion.

There are four modes available to express conversion preference:

  • auto: (Default) Canopy will convert either all local or all distributed tables to support local and distributed table joins. Canopy decides which to convert using a heuristic. It will convert distributed tables if they are joined using a constant filter on a unique index (such as a primary key). This ensures less data gets moved between workers.

  • never: Canopy will not allow joins between local and distributed tables.

  • prefer-local: Canopy will prefer converting local tables to support local and distributed table joins.

  • prefer-distributed: Canopy will prefer converting distributed tables to support local and distributed table joins. If the distributed tables are huge, using this option might result in moving lots of data between workers.

For example, assume canopy_table is a distributed table distributed by the column x, and that postgres_table is a local table:

CREATE TABLE canopy_table(x int primary key, y int);
SELECT create_distributed_table('canopy_table', 'x');

CREATE TABLE postgres_table(x int, y int);

-- even though the join is on primary key, there isn't a constant filter
-- hence postgres_table will be sent to worker nodes to support the join
SELECT * FROM canopy_table JOIN postgres_table USING (x);

-- there is a constant filter on a primary key, hence the filtered row
-- from the distributed table will be pulled to coordinator to support the join
SELECT * FROM canopy_table JOIN postgres_table USING (x) WHERE canopy_table.x = 10;

SET canopy.local_table_join_policy to 'prefer-distributed';
-- since we prefer distributed tables, canopy_table will be pulled to coordinator
-- to support the join. Note that canopy_table can be huge.
SELECT * FROM canopy_table JOIN postgres_table USING (x);

SET canopy.local_table_join_policy to 'prefer-local';
-- even though there is a constant filter on primary key for canopy_table
-- postgres_table will be sent to necessary workers because we are using 'prefer-local'.
SELECT * FROM canopy_table JOIN postgres_table USING (x) WHERE canopy_table.x = 10;

canopy.limit_clause_row_fetch_count (integer)

Sets the number of rows to fetch per task for limit clause optimization. In some cases, select queries with limit clauses may need to fetch all rows from each task to generate results. In those cases, and where an approximation would produce meaningful results, this configuration value sets the number of rows to fetch from each shard. Limit approximations are disabled by default and this parameter is set to -1. This value can be set at run-time and is effective on the coordinator.

canopy.count_distinct_error_rate (floating point)

Canopy can calculate count(distinct) approximates using the lightdb-hll extension. This configuration entry sets the desired error rate when calculating count(distinct). 0.0, which is the default, disables approximations for count(distinct); and 1.0 provides no guarantees about the accuracy of results. We recommend setting this parameter to 0.005 for best results. This value can be set at run-time and is effective on the coordinator.

canopy.task_assignment_policy (enum)

备注

This GUC is applicable for queries against Reference Tables.

Sets the policy to use when assigning tasks to workers. The coordinator assigns tasks to workers based on shard locations. This configuration value specifies the policy to use when making these assignments. Currently, there are three possible task assignment policies which can be used.

  • greedy: The greedy policy is the default and aims to evenly distribute tasks across workers.

  • round-robin: The round-robin policy assigns tasks to workers in a round-robin fashion alternating between different replicas. This enables much better cluster utilization when the shard count for a table is low compared to the number of workers.

  • first-replica: The first-replica policy assigns tasks on the basis of the insertion order of placements (replicas) for the shards. In other words, the fragment query for a shard is simply assigned to the worker which has the first replica of that shard. This method allows you to have strong guarantees about which shards will be used on which nodes (i.e. stronger memory residency guarantees).

This parameter can be set at run-time and is effective on the coordinator.

Intermediate Data Transfer

canopy.binary_worker_copy_format (boolean)

Use the binary copy format to transfer intermediate data between workers. During large table joins, Canopy may have to dynamically repartition and shuffle data between different workers. For LightDB, the default for this setting is false, which means text encoding is used to transfer this data. The parameter is effective on the workers and needs to be changed in the lightdb.conf file. After editing the config file, users can send a SIGHUP signal or restart the server for this change to take effect.

canopy.max_intermediate_result_size (integer)

The maximum size in KB of intermediate results for CTEs that are unable to be pushed down to worker nodes for execution, and for complex subqueries. The default is 1GB, and a value of -1 means no limit. Queries exceeding the limit will be canceled and produce an error message.

DDL

canopy.enable_ddl_propagation (boolean)

Specifies whether to automatically propagate DDL changes from the coordinator to all workers. The default value is true. Because some schema changes require an access exclusive lock on tables and because the automatic propagation applies to all workers sequentially it can make a Canopy cluster temporarily less responsive. You may choose to disable this setting and propagate changes manually.

备注

For a list of DDL propagation support, see Modifying Tables.

canopy.enable_local_reference_table_foreign_keys (boolean)

This setting, enabled by default, allows foreign keys to be created between reference and local tables. For the feature to work, the coordinator node must be registered with itself, using canopy_add_node.

Note that foreign keys between reference tables and local tables come at a slight cost. When you create the foreign key, Canopy must add the plain table to Canopy’ metadata, and track it in Partition table. Local tables that are added to metadata inherit the same limitations as reference tables (see Creating and Modifying Distributed Objects (DDL) and SQL Support and Workarounds)..

If you drop the foreign keys, Canopy will automatically remove such local tables from metadata, which eliminates such limitations on those tables.

Executor Configuration

General

canopy.all_modifications_commutative

Canopy enforces commutativity rules and acquires appropriate locks for modify operations in order to guarantee correctness of behavior. For example, it assumes that an INSERT statement commutes with another INSERT statement, but not with an UPDATE or DELETE statement. Similarly, it assumes that an UPDATE or DELETE statement does not commute with another UPDATE or DELETE statement. This means that UPDATEs and DELETEs require Canopy to acquire stronger locks.

If you have UPDATE statements that are commutative with your INSERTs or other UPDATEs, then you can relax these commutativity assumptions by setting this parameter to true. When this parameter is set to true, all commands are considered commutative and claim a shared lock, which can improve overall throughput. This parameter can be set at runtime and is effective on the coordinator.

canopy.multi_task_query_log_level (enum)

Sets a log-level for any query which generates more than one task (i.e. which hits more than one shard). This is useful during a multi-tenant application migration, as you can choose to error or warn for such queries, to find them and add a tenant_id filter to them. This parameter can be set at runtime and is effective on the coordinator. The default value for this parameter is ‘off’.

The supported values for this enum are:

  • off: Turn off logging any queries which generate multiple tasks (i.e. span multiple shards)

  • debug: Logs statement at DEBUG severity level.

  • log: Logs statement at LOG severity level. The log line will include the SQL query that was run.

  • notice: Logs statement at NOTICE severity level.

  • warning: Logs statement at WARNING severity level.

  • error: Logs statement at ERROR severity level.

Note that it may be useful to use error during development testing, and a lower log-level like log during actual production deployment. Choosing log will cause multi-task queries to appear in the database logs with the query itself shown after “STATEMENT.”

LOG:  multi-task query about to be executed
HINT:  Queries are split to multiple tasks if they have to be split into several queries on the workers.
STATEMENT:  select * from foo;

canopy.propagate_set_commands (enum)

Determines which SET commands are propagated from the coordinator to workers. The default value for this parameter is ‘none’.

The supported values are:

  • none: no SET commands are propagated.

  • local: only SET LOCAL commands are propagated.

canopy.enable_repartition_joins (boolean)

Ordinarily, attempting to perform Repartition joins with the adaptive executor will fail with an error message. However, setting canopy.enable_repartition_joins to true allows Canopy to perform the join. The default value is false.

canopy.enable_repartitioned_insert_select (boolean)

By default, an INSERT INTO … SELECT statement that cannot be pushed down will attempt to repartition rows from the SELECT statement and transfer them between workers for insertion. However, if the target table has too many shards then repartitioning will probably not perform well. The overhead of processing the shard intervals when determining how to partition the results is too great. Repartitioning can be disabled manually by setting canopy.enable_repartitioned_insert_select to false.

canopy.enable_binary_protocol (boolean)

Setting this parameter to true instructs the coordinator node to use LightDB’s binary serialization format (when applicable) to transfer data with workers. Some column types do not support binary serialization.

Enabling this parameter is mostly useful when the workers must return large amounts of data. Examples are when a lot of rows are requested, the rows have many columns, or they use big types such as hll from the lightdb-hll extension.

The default value is false, which means all results are encoded and transferred in text format.

canopy.max_shared_pool_size (integer)

Specifies the maximum number of connections that the coordinator node, across all simultaneous sessions, is allowed to make per worker node. LightDB must allocate fixed resources for every connection and this GUC helps ease connection pressure on workers.

Without connection throttling, every multi-shard query creates connections on each worker proportional to the number of shards it accesses (in particular, up to #shards/#workers). Running dozens of multi-shard queries at once can easily hit worker nodes’ max_connections limit, causing queries to fail.

By default, the value is automatically set equal to the coordinator’s own max_connections, which isn’t guaranteed to match that of the workers (see the note below). The value -1 disables throttling.

备注

There are certain operations that do not obey canopy.max_shared_pool_size, most importantly repartition joins. That’s why it can be prudent to increase the max_connections on the workers a bit higher than max_connections on the coordinator. This gives extra space for connections required for repartition queries on the workers.

canopy.max_adaptive_executor_pool_size (integer)

Whereas canopy.max_shared_pool_size (integer) limits worker connections across all sessions, max_adaptive_executor_pool_size limits worker connections from just the current session. This GUC is useful for:

  • Preventing a single backend from getting all the worker resources

  • Providing priority management: designate low priority sessions with low max_adaptive_executor_pool_size, and high priority sessions with higher values

The default value is 16.

canopy.executor_slow_start_interval (integer)

Time to wait in milliseconds between opening connections to the same worker node.

When the individual tasks of a multi-shard query take very little time, they can often be finished over a single (often already cached) connection. To avoid redundantly opening additional connections, the executor waits between connection attempts for the configured number of milliseconds. At the end of the interval, it increases the number of connections it is allowed to open next time.

For long queries (those taking >500ms), slow start might add latency, but for short queries it’s faster. The default value is 10ms.

canopy.max_cached_conns_per_worker (integer)

Each backend opens connections to the workers to query the shards. At the end of the transaction, the configured number of connections is kept open to speed up subsequent commands. Increasing this value will reduce the latency of multi-shard queries, but will also increase overhead on the workers.

The default value is 1. A larger value such as 2 might be helpful for clusters that use a small number of concurrent sessions, but it’s not wise to go much further (e.g. 16 would be too high).

canopy.force_max_query_parallelization (boolean)

Simulates the deprecated and now nonexistent real-time executor. This is used to open as many connections as possible to maximize query parallelization.

When this GUC is enabled, Canopy will force the adaptive executor to use as many connections as possible while executing a parallel distributed query. If not enabled, the executor might choose to use fewer connections to optimize overall query execution throughput. Internally, setting this true will end up using one connection per task.

One place where this is useful is in a transaction whose first query is lightweight and requires few connections, while a subsequent query would benefit from more connections. Canopy decides how many connections to use in a transaction based on the first statement, which can throttle other queries unless we use the GUC to provide a hint.

BEGIN;
-- add this hint
SET canopy.force_max_query_parallelization TO ON;

-- a lightweight query that doesn't require many connections
SELECT count(*) FROM table WHERE filter = x;

-- a query that benefits from more connections, and can obtain
-- them since we forced max parallelization above
SELECT ... very .. complex .. SQL;
COMMIT;

The default value is false.

Explain output

canopy.explain_all_tasks (boolean)

By default, Canopy shows the output of a single, arbitrary task when running EXPLAIN on a distributed query. In most cases, the explain output will be similar across tasks. Occasionally, some of the tasks will be planned differently or have much higher execution times. In those cases, it can be useful to enable this parameter, after which the EXPLAIN output will include all tasks. This may cause the EXPLAIN to take longer.

canopy.explain_analyze_sort_method (enum)

Determines the sort method of the tasks in the output of EXPLAIN ANALYZE. The default value of canopy.explain_analyze_sort_method is execution-time.

The supported values are:

  • execution-time: sort by execution time.

  • taskId: sort by task id.