March Docs

PubSub

PubSub module: topic-based publish-subscribe backbone for Channels.

PubSub maintains a mapping from topic strings to sets of subscriber pids. Channels subscribe on join and unsubscribe on leave; broadcasts fan out to all subscribers of a topic.

Two layers: Pure layer — PubSubState + subscribe_state/unsubscribe_state/get_subscribers. Fully testable without the actor runtime. Runtime API — subscribe/unsubscribe/broadcast/broadcast_from. Requires the actor system (spawn, actor_cast).

Sharding: topics are routed to a shard by hashing the topic string. topic_shard(topic, n) → 0..n-1 uses the built-in hash() for simplicity. Replace with an explicit FNV-1a loop if byte-level control is needed.

Wire message sent to subscribers on broadcast: PubSubBroadcast(topic, event, payload_string)

Usage (pure): let s = PubSub.new_state() let s1 = PubSub.subscribe_state(s, "room:lobby", my_pid) let subs = PubSub.get_subscribers(s1, "room:lobby") -- [my_pid] let s2 = PubSub.unsubscribe_state(s1, "room:lobby", my_pid) let subs2 = PubSub.get_subscribers(s2, "room:lobby") -- []

Types

typePubSubStatePubSubState = PubSubState(Map(String, List(Int)))#

Functions

fnnew_statenew_state()#

Create an empty PubSub state.

fnsubscribe_statesubscribe_state(ps, topic, pid)#

Subscribe pid to topic. Returns the updated state. If the pid is already subscribed, adds a duplicate (no-op semantically since broadcasts are once-per-pid after deduplication at send time).

fnunsubscribe_stateunsubscribe_state(ps, topic, pid)#

Unsubscribe pid from topic. Removes the first matching entry.

fnget_subscribersget_subscribers(ps, topic)#

Return the list of subscriber pids for topic. Empty list if none.

fnhas_subscribershas_subscribers(ps, topic)#

True when at least one pid is subscribed to topic.

fnsubscriber_countsubscriber_count(ps, topic)#

Number of subscribers currently registered for topic.

fntopic_shardtopic_shard(topic, num_shards)#

Hash-based shard index for topic given num_shards shards. Uses the built-in hash() function (same primitive the HAMT uses). Returns a value in [0, num_shards).

fntopic_matchestopic_matches(pattern, topic)#

True when pattern matches topic. A trailing * in pattern is a wildcard prefix match. Examples: topic_matches("room:", "room:lobby") -> true topic_matches("room:", "game:42") -> false topic_matches("room:lobby", "room:lobby") -> true

fnbroadcast_tobroadcast_to(pids, excluded_pid, event, payload)#

Send a broadcast message to all subscribers in pids except excluded_pid. When excluded_pid is None, all subscribers receive the message. Uses actor_cast to deliver BroadcastMsg to each pid. Requires actor runtime.

fnsubscribe_in_shardsubscribe_in_shard(shard_pid, topic, subscriber_pid)#

Subscribe subscriber_pid to topic in the shard shard_pid. Sends a PubSubSubscribe message to the shard actor. Requires actor runtime.

fnunsubscribe_in_shardunsubscribe_in_shard(shard_pid, topic, subscriber_pid)#

Unsubscribe subscriber_pid from topic in the shard shard_pid. Sends a PubSubUnsubscribe message to the shard actor. Requires actor runtime.

fnbroadcast_in_shardbroadcast_in_shard(shard_pid, topic, excluded_pid, event, payload)#

Broadcast event + payload to all subscribers of topic in shard shard_pid. Pass Some(sender_pid) to exclude the sender (broadcast_from behaviour). Requires actor runtime.