Skip to main content

Sending Messages to a Pipewire Node

·2406 words·12 mins
Development Threads Pipewire Filter-Chain
Table of Contents

Pipewire is a low-latency, graph-based processing engine for audio and video for Linux. Among its numerous features it also provides a built-in filter-chain module that allows us to inject arbitrary processing graphs of plugins, either built-in or in the form of LV2 or LADSPA plugins. These can be injected in the processing graph without any additional plugin host. The parameters of the plugins can be changed with the pipewire command line tool pw-cli.

For example, if we have a plugin named Compressor in the filter-chain with the node id 88, and the plugin has a parameter named ratio, we can change that parameter as shown:

pw-cli set-param 88 Props '{params = ["Compressor:ratio" "64.000000"]}'

But how do we do that in code? Enter pw_node_set_param, this little function allows us to send the command to set the parameters we want to change. It takes a pw_node *, a pointer for the proxy of the node we want to send to, an id for the property we want to change, some flags, and finally the pod we want to send. A pod is basically a message in the format of the plugin API spa that is used to implement Pipewire.

Putting together the correct pod is its own little puzzle which we’ll have to solve later, because before that, we’ll have to solve the little challenge that calling pw_node_set_param where I initially planned on doing so results in this informative error message: impl_ext_end_proxy called from wrong context, check thread and locking: Not in loop.

The error message itself points to the root of the problem, and that is that the communication between the pipewire server and client can only happen in the main loop, except when triggered by callbacks from pipewire itself, and because I wasn’t calling from within the main loop, it rejected my tries and answered with this helpful error message.

There are several ways to solve this problem, luckily pv[m] in #pipewire on the OFTC server pointed me to the easiest way, if we also want to pass data along in a thread safe manner, using pw_loop_invoke. And in addition to all of that, we also have to get the proxy to the node we want call using registry events.

In short, in the following we’ll:

  • Create the main loop
  • Get the registry and register the events we need to find the node
  • Do work on the main loop using pw_node_invoke
  • Build the message with SPA Pods
  • Send the set param command with pw_node_set_param

The Main Loop
#

The Main Loop is where Pipewire clients communicate with the server. When interacting with the Pipewire server we normally initialize Pipewire, setup the Main Loop, and then run the Main Loop to start processing. We initialize Pipewire calling pw_init, where we pass the command line arguments, we connect event handler for SIGINT and SIGTERM, so that we will be able to end the application gracefully, and then run the Main Loop.

Finally we clean up by destroying everything (much like my children do with their toys).

A quick side note, a common pattern in Pipewire applications that need to share global state, is to store the global state in a struct on the stack of the main function. The functions to register callback functions provided by the Pipewire API provide void * typed data pointers that can be used to give the event handler access to that data. The example application we’re discussing here stores global data in the following struct:

struct data {
  struct pw_main_loop *loop;
  struct pw_registry *registry;
  struct port *port;
  utils::NodeRegistry node_registry;
};

So the skeleton for the Pipewire application is:

int main(int argc, char *argv[]) {
  struct data data = {};

  pw_init(&argc, &argv);

  data.loop = pw_main_loop_new(NULL);

  pw_loop_add_signal(pw_main_loop_get_loop(data.loop), SIGINT, do_quit, &data);
  pw_loop_add_signal(pw_main_loop_get_loop(data.loop), SIGTERM, do_quit, &data);

  pw_main_loop_run(data.loop);

  pw_main_loop_destroy(data.loop);
  pw_deinit();

  return 0;
}

Note that we do not provide any function for the Main Loop to run, what the Main Loop does is completely defined by the Pipewire API at this point.

The Registry
#

The registry allows us to retrieve data about all the objects pipewire is currently aware of, and to listen to changes to the objects tracked by Pipewire. The registry itself doesn’t give us the means of directly asking for information, so we have to listen and remember ourself. When we initially create the registry, and the main loop is running, it will send an event for every object it knows.

To communicate with and use the registry we have to first create a proxy, a local representation of the registry in the server. We can obtain that proxy from the Core, so we also need to create the proxy to the Core. We create the proxy to the Core as follows:

auto context = pw_context_new(pw_main_loop_get_loop(data.loop), nullptr, 0);
auto core = pw_context_connect(context, nullptr, 0);

Once we have access to the core, we can create the proxy for the registry and register our event handler.

data.registry = pw_core_get_registry(core, PW_VERSION_REGISTRY, 0);
struct spa_hook registry_listener;
spa_zero(registry_listener);
pw_registry_add_listener(data.registry, &registry_listener, &registry_events,
                         &data);

The actual magic happens in the event handler which are defined as functions and referenced as function pointers in a struct of type pw_registry_events, which in the applications is called registry_events. The handler itself checks the type for the object in every event, and if it sees a node it binds a proxy to it using pw_registry_bind and adds it to the node registry, a custom container built for the application which can be found in the example source, but isn’t described here further.

static void registry_event_global(void *data, uint32_t id, uint32_t permissions,
                                  const char *c_type, uint32_t version,
                                  const struct spa_dict *props) {
  struct data *my_data = static_cast<struct data *>(data);

  std::string type(c_type);
  if (type == "PipeWire:Interface:Node") {
    auto client = static_cast<pw_client *>(
        pw_registry_bind(my_data->registry, id, c_type, PW_VERSION_CLIENT, 0));
    my_data->node_registry.add_node(id, client);
    std::cout << "Registry event global" << std::endl
              << "type: " << type << std::endl
              << "id: " << id << std::endl;
  }
}

static const struct pw_registry_events registry_events = {
    .version = PW_VERSION_REGISTRY_EVENTS,
    .global = registry_event_global,
};

So once we’re finished with processing all the events, we’ll have a list of all nodes and a bound proxy. A real application would also listen to the removal of nodes, but this is not a real application.

Doing Work on the Main Loop
#

As mentioned before, in order to communicate with the Pipewire server, we’ll have to instruct the Main Loop to do the communication for us, otherwise the call will be rejected. The Pipewire API provides pw_loop_invoke for that exact reason. The signature is as follows:

pw_loop_invoke(
  pw_loop * loop, // pointer to the loop
  spa_invoke_func_t func, // function to execute on main loop
  uint32_t seq, // sequence number
  const void *data, // arbitrary block of data
  size_t size, // size of the arbitary block of data
  bool block, // should block until func is executed
  void *user_data // opaque pointer, passed unmodified
)

The Main Loop Pointer
#

The first argument is a pointer to the loop itself. This is pretty self- explanatory, the only caveat is that we can’t pass the pointer we use to store our Main Loop directly, but we have to get the correct loop with pw_main_loop_get beforehand.

The Callback Function
#

The second argument is a function pointer to the function we want to execute on the Main Loop. The type is defined by spa_invoke_func_t which is defined like this:

typedef 
  int(* spa_invoke_func_t)(struct spa_loop *loop, bool async, uint32_t seq,
                           const void *data, size_t size, void *user_data)

Looking at the arguments reveals that the Main Loop will call that function with all of the arguments we pass into pw_loop_invoke, except for the function itself.

If we’re willing to use c++ we can use a lambda, as long as we don’t capture anything.

Control Arguments
#

This is followed by a sequence number, which is passed along to the callback function, various ways to pass along data, which will be explained in the next section, and the argument block, which signals if we want to wait until the Main Loop finishes with the execution.

Passing Data
#

The function pw_loop_invoke provides two ways to pass data into the function run on the Main Loop. The more straightforward one is the user_data of type void * which can be used to pass a pointer unprocessed directly into the function, exactly as Pipewire allows for the event handler for example. This method has the benefit that we can pass arbitrary data, but it is not inherently thread-safe, thread-safety has to be implemented by the application.

The second option is implemented using another void * pointer named data, and the associated argument size of type size_t. The pointer here is handled very differently though. Pipewire copies size bytes of the data stored at the address data points to, and stores that in a new location in memory. Our callback is then given a pointer to the new location.

This has several implications, because the data is copied, it cannot be changed or change anything else, it is basically immutable, and therefore thread-safe. But it also means that we have to be somewhat careful about what data we try to pass along. A std::string for example would not work, because the string allocates the memory for the characters on the heap, and will therefore not be copied with a simple memcpy or similar.

We can pass pointers though, because the pointer will be copied, and therefore point to the same location afterwards, but accessing that memory isn’t inherently thread-safe anymore because it is still the same location.

With all the above in mind, we can now define the data structure that will contain the data to pass along:

struct pw_invoke_set_param_data {
  unsigned int target_node_id;
  pw_client *target_node;
  double value;
};

Of the fields in the structure, target_node_id and value have a primitive type and can easily be copied, and target_node is a pointer to the proxy of our node, so we need it to point to the same location in memory, and copying should be therefore no problem.

In order for us to have a second thread that we can use to call the Main Loop, we spawn a new thread with an std::thread. In the thread we wait for a second to make sure we processed the event for the node we’re looking for, which could be improved by listening for an event, but this is left as an exercise to the reader. The final code, leaving out the code in the lambda, which we will be discussing later, looks as follows:

std::thread message_sender([&data]() {
  std::this_thread::sleep_for(std::chrono::seconds(1));

  unsigned int target_node_id = 82;
  auto target_node = data.node_registry.get_node_by_id(target_node_id);
  pw_invoke_set_param_data invoke_data{target_node_id, target_node->client,
                                       13.4};

  pw_loop_invoke(
      pw_main_loop_get_loop(data.loop),
      [](struct spa_loop *loop, bool async, u_int32_t seq, const void *data,
         size_t size, void *user_data) {
        /* [...] */
      },
      0, &invoke_data, sizeof(pw_invoke_set_param_data), true, nullptr);
});

Building the SPA Pod
#

The final step that remains to be implemented now, is sending the set param command to the node using the proxy. The function to use for this is pw_node_set_param, which takes a pointer to the node, a target parameter type, flags, which we will ignore, and the message to send, in form of a SPA Pod.

We got the pointer to the target node earlier, using the registry listener, the only challenge now is constructing the message to send. Messages exchanged via Pipewire are encoded into Pods from the Simple Plugin API (SPA). SPA Pod is a container for data exchange. A Pod can store primitive values like integers, more complex type like strings, arrays, objects and structures.

To be able to set the parameter, we have to construct a Pod with the correct format. The easiest way to understand how the Pod needs to look like, is to use the command line utility pw-cli which prints out the structure and content of the Pod before sending it, using the debug utility spa_debug_pod.

pw-cli set-param 88 Props '{params = ["Compressor:ratio" 64.000000]}

Object: size 72, type Spa:Pod:Object:Param:Props (262146), id Spa:Enum:ParamId:Props (2)
  Prop: key Spa:Pod:Object:Param:Props:params (524289), flags 00000000
    Struct: size 48
      String "Compressor:ratio"
      Float 64.00000

The output tells us that the object size is 72, has the type Spa:Pod:Object:Param:Props and an id of Spa:Enum:ParamId:Props. The object has one property, for the key Spa:Pod:Object:Param:Props:params, and the value is a struct with one entry. Pod structs are somewhat different than general programming language structs, and can the easiest be described as arrays, where even indices are the struct names, and the odd indices contain the values. In our case the struct contains an attribute with the name of “Compressor:ratio” and the float value 64.0.

The method we use to build the SPA Pod is shown below:

static spa_pod *build_set_params_message(u_int8_t *buffer, size_t buffer_size,
                                         std::string param_name,
                                         double param_value) {

  struct spa_pod_builder builder;
  spa_pod_builder_init(&builder, buffer, buffer_size);

  struct spa_pod_frame object_frame;
  spa_pod_builder_push_object(&builder, &object_frame, SPA_TYPE_OBJECT_Props,
                              SPA_PARAM_Props);
  spa_pod_builder_prop(&builder, SPA_PROP_params, 0);

  struct spa_pod_frame struct_frame;
  spa_pod_builder_push_struct(&builder, &struct_frame);
  spa_pod_builder_string(&builder, param_name.c_str());
  spa_pod_builder_double(&builder, param_value);
  spa_pod_builder_pop(&builder, &struct_frame);

  return static_cast<spa_pod *>(spa_pod_builder_pop(&builder, &object_frame));
}

First we create a spa_pod_builder, by initializing with a call to spa_pob_builder_init, to which we pass a pointer to the builder itself and memory it can use to build the Pod. To create the object, we need to push it on the stack, the work area on the builder. In order for us to be able to retrieve it later, we need to store the frame, basically the position of the object on the stack. We push the object on the stack with correct type and id, and store the frame using the following snippet:

struct spa_pod_frame object_frame;
spa_pod_builder_push_object(&builder, &object_frame, SPA_TYPE_OBJECT_Props,
                            SPA_PARAM_Props);

We then add a property to the object as follows:

spa_pod_builder_prop(&builder, SPA_PROP_params, 0);

This does only add the property, but no value yet. The value needs to be a struct, which, as a complex type, is also constructed on the stack, so we have to push the struct on the stack, and then add the data. As discussed above, the data in a struct is a list of interleaved keys and values, so we add the key and the value now.

struct spa_pod_frame struct_frame;
spa_pod_builder_push_struct(&builder, &struct_frame);
spa_pod_builder_string(&builder, param_name.c_str());
spa_pod_builder_double(&builder, param_value);

Finally we need to first pop the struct and then the object from the stack (which we will return here, because we’re in a function), using the previously stored frames.

spa_pod_builder_pop(&builder, &struct_frame);
return static_cast<spa_pod *>(spa_pod_builder_pop(&builder, &object_frame));

Send the command
#

After building the Pod, we’re ready so send the command to set the parameter value, which we can do by calling pw_node_set_param.

pw_node_set_param(
    reinterpret_cast<struct pw_node *>(param_data->target_node),
    SPA_PARAM_Props, 0, pod);

null

comments powered by Disqus