Skip to the content.

Technical Notes

Design-level notes about how the runtime actually behaves — lifecycles, channels, timers, and the small decisions that aren’t obvious from any single file. Useful before changing behavior, and equally useful if you just want a deeper picture than the high-level docs offer. The public API reference lives in rust-api.md / python-api.md; the layering overview is in architecture.md.


1. Runtime

src/runtime.rs holds a process-wide, lazily initialized multi-thread tokio runtime (OnceLock<Runtime>). Every background task in the library is spawned on this runtime — crate::runtime::spawn, not tokio::spawn.

Why a dedicated runtime? Spawning on try_current previously attached scanner / device tasks to the consumer application’s runtime. That made clean shutdown of the consumer’s runtime kill our background tasks at arbitrary points (mid-handshake, mid-reconnect), and made Device::new silently fail when called from outside any async context.

Consequences:

runtime::maximize_fd_limit() raises RLIMIT_NOFILE to the hard limit on Unix. Called by the Python bindings on module init; not called automatically from the Rust side (consumers may have their own fd policy).


2. Scanner

src/scanner.rs. The whole scanner is a process-wide singletonstatic GLOBAL_SCANNER: OnceLock<Scanner>. Scanner::get() / scanner::get() initialize on first access and return a &'static Scanner. There is no public constructor and no builder: every consumer goes through the singleton, and every configurable knob (set_timeout, set_ports, set_bind_address) takes &self and mutates the singleton’s interior state.

Passive vs active scan — design intent

Tuya devices on a LAN do two things autonomously on UDP:

  1. They periodically advertise themselves via short broadcast packets on ports 6666 (v3.1/3.2/3.3), 6667 (v3.4), and 7000 (v3.5).
  2. They reply to discovery probes sent to those same ports.

rustuya’s scanner is built around that asymmetry:

The reason they share infrastructure: a “response to our active broadcast” and “an unsolicited periodic advertisement” are the same packet shape on the wire. There’s no point running two separate UDP read paths — a single dispatcher folds both into the same cache.

      (1) periodic advertise           (2) reply to probe
                │                                │
                ▼                                ▼
        ┌────────────────────────────────────────────────┐
        │         UDP receiver tasks (per-port)          │
        │       always-on; bound at singleton init       │
        └───────────────────────┬────────────────────────┘
                                ▼
                ┌────────────────────────────────┐
                │   shared mpsc (1024 capacity)  │
                └───────────────┬────────────────┘
                                ▼
                ┌────────────────────────────────┐
                │   dispatcher task (singular)   │
                │ parse_packet → cache.write →   │
                │ publish_discovery (watch++)    │
                └───────────────┬────────────────┘
                                ▼
                    discovery_version watch channel
                                │
           ┌────────────────────┼────────────────────┐
           ▼                    ▼                    ▼
  scan_stream consumer   discover_device   Device::wait_for_backoff
  (yields cache items)    (waits for ID)    (scanner-bypass logic)

   (the box on the right also has, alongside it, the *active scan*
    discovery loop which writes broadcasts back into the UDP receiver
    tasks' sockets — those sockets are full-duplex)

Singleton lifecycle

The shared packet channel

One long-lived mpsc<(Vec<u8>, SocketAddr)> is shared across all receiver tasks. Capacity PACKET_CHANNEL_CAPACITY = 1024 — sized to absorb a sub-millisecond burst of a few thousand simultaneous device replies (each dispatch_packet is microsecond-scale: parse + cache write + retain + publish). If the channel ever filled, the receiver’s tx.send().await would park and back-pressure the UDP socket, which in turn causes the kernel buffer to drop packets — so the capacity is deliberately generous.

When set_ports() adds a port mid-flight, the new receiver task clones the existing sender — so its packets reach the same dispatcher. Before M1.2 each call to spawn_receiver_tasks created a fresh channel whose rx was never polled, silently dropping every packet on the newly-added port.

ensure_passive_listener is serialized by a startup_guard: Mutex<()>. Without it, two concurrent callers (set_ports racing the discovery-loop fallback) could end up spawning two dispatchers on disjoint mpsc channels — silently forking the cache-write path. stop_passive_listener and set_bind_address both take the same guard so they can’t interleave with a reconcile in flight.

Port reconciliation

set_ports(new_list) is diff-based: compute_port_diff figures out adds vs removes against the currently-bound socket map, and reconcile_listener_locked applies both. A port dropped from the list has its per-port receiver_tasks JoinHandle aborted and its socket removed from state.sockets. Per-port receiver task tracking (HashMap<u16, JoinHandle>) is the structural change that makes this possible; before it, receiver tasks lived in a flat Vec with no per-port handle.

set_bind_address on an already-running listener aborts all per-port receivers and clears the socket map, but keeps the dispatcher and packet_tx alive — so the splice path in reconcile_listener_locked reuses them. Practical effect: in-flight UDP packets on the old sockets may be dropped during the rebind (μs window), but the discovery cache and all watch-channel subscribers see no discontinuity.

Discovery notifications use watch, not Notify

Notify::notified() had a narrow re-subscribe race: a publish that fired between two notified().await calls could be lost. The fix (ScannerState::discovery_version) is a tokio::sync::watch::channel::<u64>. Each cache update bumps the counter; subscribers changed().await and re-read the cache. The carried value is opaque — the change itself is the signal.

Subscribers are pre-marked-seen (mark_unchanged()) so only future updates resolve changed(). This pattern shows up in subscribe_discoveries, scan_stream_instance, and wait_for_backoff.

Timing invariants

scanner.rs defines the discovery active-scan schedule near the top of the file:

t=0       broadcast #1   (interval fires immediately on first tick)
t=6s      broadcast #2
t=12s     broadcast #3   (MAX_BROADCASTS reached)
t=18s     timeout        (RECEIVE_MARGIN of 6s after the last broadcast)

DEFAULT_SCAN_TIMEOUT is derived from BROADCAST_INTERVAL, MAX_BROADCASTS, and RECEIVE_MARGIN. A unit test (scan_timeout_leaves_room_after_last_broadcast) pins the invariant so a future tweak to one constant can’t silently destroy the receive window for late replies.

Other timers:

A consequence worth knowing: after a single scan_stream call, the next 30 minutes of scan_stream calls return only cached devices without sending new broadcasts. If you specifically want a fresh sweep inside that window, force it by clearing the cache for the device of interest (invalidate_cache(id)) or by triggering discover_device which has the shorter throttle.

Active-scan single-winner

Multiple concurrent scan_stream() callers must not both start a discovery loop. The “I am the active scanner” claim is a compare_exchange on active_scanning: AtomicBool. Losers fall through to subscribe to the discovery channel and yield results from the cache as they arrive. Regression test: active_scanning_compare_exchange_is_single_winner.

Active-scan broadcast loop

perform_discovery_loop is a small state machine:

  1. Snapshot the configured ports and pick up the sockets already bound by the passive listener. If for some reason a configured port is unbound (race window), trigger ensure_passive_listener once as a fallback and try again.
  2. Drive a tokio::time::interval(6s). Tokio’s interval fires immediately on first poll (default Burst behavior), so broadcasts land at t=0, 6s, 12s.
  3. After MAX_BROADCASTS = 3 sends, the loop switches to a plain sleep(remaining) for the rest of the timeout window (rc.3 simplification — previously it kept spinning the select! and skipping the tick, harmless but wasteful).
  4. The whole loop runs for DEFAULT_SCAN_TIMEOUT = 18s. The last 6s (t=12s..18s) is the receive margin during which the passive receivers are still picking up late device replies into the cache, and scan_stream_instance consumers are still yielding them. When the loop exits at t=18s, the spawning task sets active_scanning=false and publish_discovery() — the wake-up that lets consumers do one last cache drain before their stream ends.

The 6s receive margin is the load-bearing invariant — scan_timeout_leaves_room_after_last_broadcast pins it so a future tweak to BROADCAST_INTERVAL / MAX_BROADCASTS / RECEIVE_MARGIN can’t silently destroy it.

Local-IP discovery

discover_local_ip_blocking tries 8.8.8.8, 255.255.255.255, and 203.0.113.1 (TEST-NET-3). UDP connect() only does a kernel route lookup — no packet is actually sent. Works on air-gapped LANs where 8.8.8.8 is unreachable.

Intentionally not cached. A previous OnceLock<Option<String>> froze the IP at first call, so a host whose IP later changed (DHCP renewal, WiFi switch, VPN up/down, container restart, …) would forever stamp the stale address into v3.5 discovery broadcasts. The lookup is sub-millisecond and only runs from send_discovery_broadcast, which fires at most a few times per active scan and is itself throttled by the 30-minute global scan cooldown — so the saved cost was negligible compared to the staleness risk. Only v3.5 (port 7000) discovery payloads carry the local IP; v3.1–v3.4 broadcasts omit it.

parse_packet is brute-force by design

Tuya UDP discovery payloads can be: raw JSON, AES-ECB encrypted with one of three different keys (UDP_KEY_33 / _34 / _35), wrapped in 55AA or 6699 envelopes, with or without a retcode field. There is no single in-packet hint of which combination to try. parse_packet iterates all key × retcode × envelope permutations and returns the first one whose output parses as JSON with a gwId / devId. The brute-force is bounded (~12 attempts per packet) and only runs on packets that already passed the cheap raw-JSON attempt.


3. Device

Construction is eager

Device::new / DeviceBuilder::build immediately spawn a long-running background task on rustuya’s runtime — run_connection_task in src/device/actor.rs. The task performs reconnects, heartbeats, and IP rediscovery. It stays alive until:

The constructor itself returns quickly — real TCP work happens on the spawned task. Construct devices lazily if you may have hundreds-to-thousands of them; each one starts a task on construction.

Initial jitter

The connection task sleeps a conditional initial jitter before its first connect attempt (record_construction_and_compute_jitter in device/actor.rs):

Tests that measure Arc counts must account for this — see the note in unified_listener_cycle_does_not_leak_inner_arcs.

Arc<DeviceInner> ownership graph

Drop for DeviceInner calls cancel_token.cancel() — so a “leak-free” shutdown via dropping all handles also fires the cancel for any tasks that captured the token directly.

close vs stop — and why there’s a side-channel

Method State after Reconnects? Mechanism
close() / fire_close() Disconnected Yes, on next user request (if persist=true) close_notify.notify_waiters()
stop() / fire_stop() Stopped Never close_notify then cancel_token.cancel()

Originally close was a DeviceCommand::Disconnect enqueued on the user mpsc. That made shutdown wait behind every queued user request. M2.1 replaced it with tokio::sync::Notify on DeviceInner. The actor’s select! in maintain_connection listens to close_notify.notified() alongside rx.recv() — so close() interrupts the current connection without draining the command queue.

stop fires the notify first (drop the connection promptly), then the cancel token (exit the outer loop). Ordering matters: cancelling first would race against any in-flight reconnect.

fire_close / fire_stop are the sync mirrors of close / stop — both implementations are actually sync (Notify + state lock are synchronously usable); the async flavors just exist so users in async contexts don’t have to remember to .await an immediate operation.

Heartbeats

Two constants, both at the top of src/device/mod.rs:

Net effect: heartbeats go out roughly every 10 seconds of idleness (every other 5s tick: the first tick after a send sees last_sent elapsed of 5s < 7s and skips; the second tick sees 10s ≥ 7s and fires, which updates last_sent — and the cycle repeats). Never back-to-back if the user is actively sending commands: any successful send updates last_sent via update_last_sent in send_raw_to_stream, so a busy device sees no heartbeat traffic at all.

process_heartbeat short-circuits if last_sent.elapsed() < SLEEP_HEARTBEAT_DEFAULT. Heartbeats are sent only when persist=truepersist=false devices are torn down at the end of a request anyway.

The heartbeat Interval uses MissedTickBehavior::Skip — if the actor was busy for several ticks, we get one catch-up tick, not a flood. (Burst is the tokio default; it would queue up missed ticks and fire them all at once.)

Inactivity timeout

SLEEP_INACTIVITY_TIMEOUT = 30s. The reader task wraps its read_u8() in a timeout(SLEEP_INACTIVITY_TIMEOUT, …). If 30 seconds pass with no byte arriving on the socket, we treat the connection as dead and send TuyaError::Timeout to the actor via internal_tx, which triggers a teardown + reconnect. This catches NAT timeouts and silent half-open sockets that wouldn’t otherwise raise an error.

The 30s vs 7s heartbeat gap: a healthy device replies to our heartbeat within sub-second, so 30s of total silence means the path is broken even if our write_half thinks the TCP socket is still up.

Reconnect backoff

get_backoff_duration:

Scanner-triggered backoff bypass

While the actor is sleeping out a backoff in wait_for_backoff, it subscribes to scanner discovery events. If the scanner reports a fresh discovery (< 10s old) for this device, the actor consults decide_discovery_notify_action (src/device/decision.rs) to choose one of:

The bypass/report decision is in decision.rs as a pure function with its own test table, so the policy can be reasoned about without spinning up an actor.

persist=false mode

persist=false means “connect on-demand for a single request, then disconnect”. The connection task still exists, but it parks in rx.recv() until a Request arrives, then connects, runs the request, and drops the stream.

The subtle bit: rapid user requests against an unreachable persist=false device used to translate into a TCP SYN storm — every request immediately retried connect_and_handshake. M1.5 added an exponential backoff between user-triggered retries in the persist=false arm of try_connect_with_backoff, using the same get_backoff_duration. ConnectNow still bypasses the wait.

nowait mode

Device::set_nowait(true) makes status / set_value / request return immediately after queuing the command, without awaiting the device’s response. The oneshot resp_tx is still attached to the command, but the caller drops resp_rx immediately so the actor’s resp_tx.send(...) becomes a no-op.

This is per-device, atomic, and runtime-mutable (AtomicBool). Useful when you’re firing commands at a known-unresponsive bulb during boot and don’t care about confirmation.

Response matching: cmd + optional cid, not seqno

The Tuya LAN protocol is structurally fire-and-forget. Many firmwares echo seqno=0 or a device-side counter unrelated to what was sent, so seqno-based correlation is unreliable. match_response in decision.rs matches on cmd (+ optional cid for sub-devices). Consequences:

NO_RESPONSE_CMDS (in src/device/mod.rs): session-key negotiation commands and heartbeats. The actor doesn’t wait for these.

MANDATORY_DATA_CMDS (same file): currently just LanExtStream. Empty-ACK responses for these commands are ignored — the caller wants the actual payload, not the ACK.

Seqno is informational on the wire

*seqno = seqno.wrapping_add(1) (in build_message) — the field is informational (see above), so wrapping is benign and avoids a debug-build panic on a long-lived connection that crosses u32::MAX.

get_cipher read-then-upgrade

Cipher construction is amortized: get_cipher reads first (parking_lot::RwLock::read) and only upgrades to write if the cipher needs to be (re)created. Before M2.3 this was a write-lock unconditionally, which serialized the reader and writer halves of every connection through cipher access.


4. Channels

Channel Owner Capacity Purpose
mpsc<DeviceCommand> per DeviceInner CHAN_MPSC_CAPACITY = 64 user → actor commands (Request, ConnectNow)
broadcast<TuyaMessage> per DeviceInner CHAN_BROADCAST_CAPACITY = 128 actor → listeners + response matcher
Notify (close_notify) per DeviceInner n/a close/stop side-channel that bypasses the command queue
CancellationToken per DeviceInner n/a shutdown signal for all tasks holding a clone
oneshot<Result<Option<TuyaMessage>>> per Request 1 response back to the request’s caller
mpsc<TuyaError> per active connection 1 reader task → actor (transport errors)
mpsc<(Vec<u8>, SocketAddr)> scanner singleton PACKET_CHANNEL_CAPACITY = 1024 UDP receivers → dispatcher
watch<u64> scanner singleton n/a (single-value) discovery notifications
std::sync::mpsc::sync_channel<...> per sync bridge CHAN_WORKER_COMMAND_CAPACITY / CHAN_UNIFIED_CAPACITY async → sync forwarder

Broadcast lag synthetic

If a listener falls behind by more than CHAN_BROADCAST_CAPACITY (128) messages, broadcast::Receiver::recv() returns Err(Lagged(n)) instead of the missed messages. Device::listener() turns that into a synthetic event with payload = {"errorCode": ERR_STATE, "reason": "listener_lagged", "skipped": n} so the consumer can detect and react. Device::receive() continues silently (it has no event channel to inject into).

128 was picked as a reasonable compromise: tens of devices firing simultaneously can briefly burst above the consumer’s drain rate without triggering lag, but a chronically slow consumer is surfaced rather than hidden.


5. Device::listener and unified_listener

Per-device listener

Device::listener() returns an impl Stream<Item = Result<TuyaMessage>> + Send + 'static backed by a fresh broadcast::Receiver and a Device clone (one strong Arc). The stream’s select! listens to both the broadcast and cancel_token.cancelled() — without the cancel branch, the stream would only exit when broadcast_tx is dropped, which requires the very last Arc<DeviceInner> to disappear — and the stream itself holds one of those refs.

Empty-payload messages are filtered out at the source so the listener only sees real events.

unified_listener(Vec<Device>) — async

unified_listener in src/device/listener.rs is a pure function that maps each device’s listener() into a DeviceEvent { device_id, message } and merges them with futures_util::stream::select_all. No spawn, no hidden state — the returned stream is a normal impl Stream. Dropping it releases every per-device subscription. Dropping one device mid-stream loses that sub-stream but doesn’t kill the merged stream (regression test: stopping_one_device_does_not_kill_unified_stream).

unified_listener(Vec<sync::Device>) — sync

unified_listener in src/sync.rs builds the async stream the same way, then spawns a bridge_to_sync task on the library runtime that pumps items into a std::sync::mpsc::sync_channel<Result<DeviceEvent>>. The caller gets a Receiver they can recv() from any thread (no tokio runtime required). Default buffer is CHAN_UNIFIED_CAPACITY; unified_listener_with_capacity exposes it.

The bridge:

Calling it multiple times

Today, unified_listener can be called any number of times — each call spins up an independent select_all (async) or bridge task (sync). The sync variant is not a singleton; N calls = N bridge tasks = N×N broadcast subscriptions. There’s no resource cycle and no correctness issue, but in workloads where multiple consumers want the same unified stream the canonical pattern is one unified_listener call, fan-out on the consumer side, not N parallel calls.

Reconfiguring (add/remove device) is done by drop the old listener, create a new one with the new device list. There is no in-place attach/detach API. The Tuya broadcast bus is per-device and outlives the listener, so no events are lost on the device side during the re-create gap (μs scale); whatever lands during that gap is missed by the listener but will be re-pushed on the device’s next periodic status emission.


6. Sync wrapper

src/sync.rs. Each sync::Device owns a long-lived worker task spawned on the library runtime (in sync::Device::from_async) that owns the underlying async Device and reads SyncRequests from a tokio mpsc, then sends responses back via oneshot. Calling sync::Device::status() looks like a blocking call but is really:

  1. Build a SyncRequest with a oneshot tx.
  2. blocking_send the request onto the worker’s tokio mpsc.
  3. Block on oneshot::Receiver::blocking_recv.

Don’t call sync APIs from inside a tokio runtime

tokio::sync::mpsc::Sender::blocking_send panics if called from within a tokio runtime context. Before M1.7 this surfaced as a confusing panic inside the bindings. check_no_runtime_context (called at the top of every sync entry point) detects it and returns TuyaError::io_other with a clear message.

fire_close / fire_stop on sync

These bypass the worker channel entirely — they go straight to the underlying async DeviceInner and fire close_notify / cancel_token. This means shutdown can’t queue behind a stuck request on the worker mpsc. Calling close() (async) on sync::Device would have the same end effect but requires the worker to drain.

sync::Device::listener

Returns a Receiver<Result<TuyaMessage>> (one per call). Each call spawns its own bridge_to_sync task. Mirrors the design of sync::unified_listener for consistency.


7. Python bindings

python/src/lib.rs. Thin pyo3 wrappers over sync::*. Module init:

The FFI surface is re-exported from rustuya::sync::internal::* (M3.2 — DeviceCommand, SubDeviceCommand, SyncRequest). The top-level paths also still work for backward compatibility but will be removed in a future minor.

Python’s GIL is released for the duration of any blocking sync call (py.detach(...) on the Rust side; Python::attach(...) is used when the binding briefly needs the GIL back, e.g. to call py.check_signals()). The Rust worker doesn’t need the GIL, and other Python threads stay live. (pyo3 renamed the historic allow_threads helper to detach in 0.27; the doc snippet uses the current name.)


8. Common gotchas