diff --git a/meson.build b/meson.build index 998cfb2..55f7cd6 100644 --- a/meson.build +++ b/meson.build @@ -89,6 +89,15 @@ sub_pinelog = subproject('pinelog', required: true, default_options: pinelog_options) dep_pinelog = sub_pinelog.get_variable('libpinelog_dep') +localipc_options = ['lipc_examples=false'] +if get_option('lipc_examples') + localipc_options = ['lipc_examples=true'] +endif + +sub_localipc = subproject('localipc', required: true, + default_options: localipc_options) +dep_localipc = sub_localipc.get_variable('liblocalipc_dep') + # inih # Use system inih dep_inih = dependency('inih') diff --git a/meson_options.txt b/meson_options.txt index e4078ef..77e8559 100644 --- a/meson_options.txt +++ b/meson_options.txt @@ -19,3 +19,8 @@ option('udev-rules-dir', option('input-group', type: 'string', value: 'plugdev', description: 'Group for input devices') + +option('lipc_examples', + type: 'boolean', + value: false, + description: 'Build localipc example programs') diff --git a/subprojects/localipc/examples/lipc_client_example.c b/subprojects/localipc/examples/lipc_client_example.c new file mode 100644 index 0000000..3741aee --- /dev/null +++ b/subprojects/localipc/examples/lipc_client_example.c @@ -0,0 +1,138 @@ +#include +#include +#include +#include +#include + +#include + +enum { + REQ_PING = 1, + REQ_NOTIFY = 2, +}; + +struct completion_state { + int done; +}; + +static lipc_status on_ping_reply(void *user, const lipc_header *hdr, + const uint8_t *payload, size_t payload_len) +{ + struct completion_state *state = (struct completion_state *)user; + + printf("reply tid=%u status=%u request=%u payload_len=%zu\n", + (unsigned)hdr->tid, (unsigned)hdr->status, (unsigned)hdr->request, payload_len); + if (payload_len > 0 && payload != NULL) { + printf("reply payload: %.*s\n", (int)payload_len, (const char *)payload); + } + + state->done = 1; + return LIPC_OK; +} + +static lipc_status on_notify(void *user, const lipc_header *hdr, + const uint8_t *payload, size_t payload_len) +{ + (void)user; + printf("notify request=%u payload_len=%zu\n", (unsigned)hdr->request, payload_len); + if (payload_len > 0 && payload != NULL) { + printf("notify payload: %.*s\n", (int)payload_len, (const char *)payload); + } + return LIPC_OK; +} + +int main(void) +{ + const char *socket_path = lipc_default_socket_path(); + static const uint8_t ping_payload[] = "ping"; + static const lipc_method_desc reply_desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_OPTIONAL, + .payload_min_len = 0, + .payload_max_len = 1024, + }; + static const lipc_method_desc notify_desc = { + .index = LIPC_FIELD_FORBIDDEN, + .value = LIPC_FIELD_FORBIDDEN, + .payload = LIPC_FIELD_REQUIRED, + .payload_min_len = 1, + .payload_max_len = 1024, + }; + static const lipc_client_reply_handler reply_handlers[] = { + { + .request_id = REQ_PING, + .reply_desc = &reply_desc, + .on_reply = on_ping_reply, + }, + }; + static const lipc_client_notify_handler notify_handlers[] = { + { + .request_id = REQ_NOTIFY, + .notify_desc = ¬ify_desc, + .on_notify = on_notify, + }, + }; + static const lipc_client_route_table routes = { + .reply_handlers = reply_handlers, + .n_reply = sizeof(reply_handlers) / sizeof(reply_handlers[0]), + .notify_handlers = notify_handlers, + .n_notify = sizeof(notify_handlers) / sizeof(notify_handlers[0]), + }; + + struct completion_state completion; + lipc_client *client; + lipc_status st; + uint32_t tid = 0; + int fd; + + completion.done = 0; + client = lipc_client_create(LIPC_MAX_PAYLOAD_DEFAULT, &routes, &completion); + if (client == NULL) { + perror("lipc_client_create"); + return 1; + } + + fd = lipc_socket_connect(socket_path, 0u); + if (fd < 0) { + perror("lipc_socket_connect"); + lipc_client_destroy(client); + return 1; + } + + st = lipc_client_request_async(client, fd, + REQ_PING, + 7, + 42, + ping_payload, + sizeof(ping_payload) - 1u, + NULL, + NULL, + &tid); + if (st != LIPC_OK) { + fprintf(stderr, "request_async failed: %s\n", lipc_status_str(st)); + close(fd); + lipc_client_destroy(client); + return 1; + } + printf("request sent with tid=%u\n", tid); + + while (!completion.done) { + st = lipc_client_poll_once(client, fd); + if (st == LIPC_WOULD_BLOCK) { + usleep(10 * 1000); + continue; + } + + if (st != LIPC_OK) { + fprintf(stderr, "poll_once failed: %s\n", lipc_status_str(st)); + close(fd); + lipc_client_destroy(client); + return 1; + } + } + + close(fd); + lipc_client_destroy(client); + return 0; +} diff --git a/subprojects/localipc/examples/lipc_server_example.c b/subprojects/localipc/examples/lipc_server_example.c new file mode 100644 index 0000000..32849ce --- /dev/null +++ b/subprojects/localipc/examples/lipc_server_example.c @@ -0,0 +1,124 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +enum { + REQ_PING = 1, + REQ_NOTIFY = 2, +}; + +static volatile sig_atomic_t g_stop = 0; + +struct server_thread_ctx { + lipc_server *server; + lipc_status run_status; +}; + +static void on_signal(int sig) +{ + (void)sig; + g_stop = 1; +} + +static lipc_status on_ping(void *user, lipc_server_reply_ctx *reply, + const lipc_header *req_hdr, const uint8_t *payload, size_t payload_len) +{ + static const char notify_text[] = "notify: ping handled"; + lipc_header notify_hdr; + lipc_status st; + + (void)user; + + memset(¬ify_hdr, 0, sizeof(notify_hdr)); + notify_hdr.request = REQ_NOTIFY; + notify_hdr.status = LIPC_OK; + notify_hdr.tid = 0; + notify_hdr.length = (uint32_t)(sizeof(notify_text) - 1u); + + st = lipc_frame_write(reply->fd, ¬ify_hdr, notify_text, sizeof(notify_text) - 1u); + if (st != LIPC_OK) { + return st; + } + + return lipc_server_reply(reply, LIPC_OK, req_hdr->index, req_hdr->value, payload, payload_len); +} + +static void *run_server_thread(void *arg) +{ + struct server_thread_ctx *ctx = (struct server_thread_ctx *)arg; + ctx->run_status = lipc_server_run(ctx->server); + return NULL; +} + +int main(void) +{ + const char *socket_path = lipc_default_socket_path(); + static const lipc_method_desc ping_desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_OPTIONAL, + .payload_min_len = 0, + .payload_max_len = 1024, + }; + static const lipc_server_handler handlers[] = { + { + .request_id = REQ_PING, + .request_desc = &ping_desc, + .fn = on_ping, + }, + }; + + struct server_thread_ctx thread_ctx; + pthread_t io_thread; + int listen_fd; + int rc; + + signal(SIGINT, on_signal); + signal(SIGTERM, on_signal); + + listen_fd = lipc_socket_listen(socket_path, 16, LIPC_SOCKET_NONBLOCK); + if (listen_fd < 0) { + perror("lipc_socket_listen"); + return 1; + } + + thread_ctx.server = lipc_server_create( + listen_fd, LIPC_MAX_PAYLOAD_DEFAULT, handlers, sizeof(handlers) / sizeof(handlers[0]), NULL); + if (thread_ctx.server == NULL) { + perror("lipc_server_create"); + close(listen_fd); + unlink(socket_path); + return 1; + } + thread_ctx.run_status = LIPC_OK; + + rc = pthread_create(&io_thread, NULL, run_server_thread, &thread_ctx); + if (rc != 0) { + fprintf(stderr, "pthread_create failed: %s\n", strerror(rc)); + lipc_server_destroy(thread_ctx.server); + unlink(socket_path); + return 1; + } + + while (!g_stop) { + sleep(1); + } + + (void)lipc_server_stop(thread_ctx.server); + (void)pthread_join(io_thread, NULL); + + if (thread_ctx.run_status != LIPC_OK) { + fprintf(stderr, "server loop failed: %s\n", lipc_status_str(thread_ctx.run_status)); + } + + lipc_server_destroy(thread_ctx.server); + unlink(socket_path); + return thread_ctx.run_status == LIPC_OK ? 0 : 1; +} diff --git a/subprojects/localipc/examples/meson.build b/subprojects/localipc/examples/meson.build new file mode 100644 index 0000000..0101598 --- /dev/null +++ b/subprojects/localipc/examples/meson.build @@ -0,0 +1,15 @@ +lipc_server_example = executable('lipc_server_example', + 'lipc_server_example.c', + include_directories: [localipc_inc], + link_with: lib_localipc, + dependencies: [dep_localipc_threads], + install: false, +) + +lipc_client_example = executable('lipc_client_example', + 'lipc_client_example.c', + include_directories: [localipc_inc], + link_with: lib_localipc, + dependencies: [dep_localipc_threads], + install: false, +) diff --git a/subprojects/localipc/include/localipc/lipc.h b/subprojects/localipc/include/localipc/lipc.h new file mode 100644 index 0000000..0c973e1 --- /dev/null +++ b/subprojects/localipc/include/localipc/lipc.h @@ -0,0 +1,739 @@ +#ifndef LOCALIPC_LIPC_H +#define LOCALIPC_LIPC_H + +/** + * @file lipc.h + * @brief Public API for liblocalipc framed UNIX-stream IPC. + * + * The library provides a protocol-generic request/response + notification transport with a + * fixed big-endian wire header, zlib-compatible CRC32 checksums, framing helpers, and + * table-driven dispatch for client/server endpoints. + * + * On-wire fixed header (32 bytes) matches this logical layout; @c payload bytes follow on the + * stream and are not part of the header blob. The flexible array member appears only in the + * documented shape below; @ref lipc_header is the fixed prefix only so it can be stack-allocated. + * + * @code + * struct lipc_wire_layout { + * uint32_t magic; // 0x4C495043 ("LIPC" as big-endian uint32_t) + * uint16_t version; // 1 + * uint16_t request; // Request Type + * uint16_t status; // Daemon Return Code + * uint16_t index; // Optional Index + * uint32_t tid; // Transaction ID (0 for Async) + * uint32_t length; // Payload Length + * uint32_t checksum; // CRC32 + * uint64_t value; // Optional inline 64-bit value + * uint8_t payload[]; // Flexible array member (variable length; follows on the wire) + * }; + * @endcode + * + * @example examples/lipc_server_example.c + * @example examples/lipc_client_example.c + */ + +#include +#include +#include +#include + +/** + * @defgroup lipc_packet Packet Format and Framing + * Wire header fields, encoding/decoding, checksums, and stream-safe frame I/O. + * @{ + */ + +/** liblocalipc API major version. */ +#define LIPC_VERSION_MAJOR 0 +/** liblocalipc API minor version. */ +#define LIPC_VERSION_MINOR 1 +/** liblocalipc API patch version. */ +#define LIPC_VERSION_PATCH 0 + +/** Size of the fixed on-wire header (flexible array member is not included). */ +#define LIPC_HEADER_SIZE 32u + +/** Protocol magic: ASCII @c LIPC as a big-endian @c uint32_t (0x4C495043). */ +#define LIPC_PROTOCOL_MAGIC 0x4C495043u + +/** Alias for @ref LIPC_PROTOCOL_MAGIC (plan / shorthand). */ +#define LIPC_MAGIC LIPC_PROTOCOL_MAGIC + +/** Supported wire protocol version (header @c version field). */ +#define LIPC_PROTOCOL_VERSION 1u + +/** + * Default maximum payload size (bytes) for @ref lipc_header_decode and framing helpers. + * Callers may pass a lower cap per connection. + */ +#define LIPC_MAX_PAYLOAD_DEFAULT 65536u + +/** Byte offsets of header fields (big-endian on the wire). */ +#define LIPC_OFF_MAGIC 0u +#define LIPC_OFF_VERSION 4u +#define LIPC_OFF_REQUEST 6u +#define LIPC_OFF_STATUS 8u +#define LIPC_OFF_INDEX 10u +#define LIPC_OFF_TID 12u +#define LIPC_OFF_LENGTH 16u +#define LIPC_OFF_CHECKSUM 20u +#define LIPC_OFF_VALUE 24u + +/** @} */ + +/** + * @defgroup lipc_socket_helpers UNIX Socket Helpers + * UNIX domain socket utilities used by both applications and liblocalipc internals. + * @{ + */ + +/** Default filesystem socket path used by examples/tests. */ +#define LIPC_DEFAULT_SOCKET_PATH "/tmp/lipc.sock" + +/** Socket helper flag: configure fd/socket as non-blocking. */ +#define LIPC_SOCKET_NONBLOCK (1u << 0) + +/** Socket helper flag: configure fd/socket with close-on-exec. */ +#define LIPC_SOCKET_CLOEXEC (1u << 1) + +/** @} */ + +/** + * @defgroup lipc_status_api Status and Error Handling + * Result codes and helper utilities for protocol and runtime errors. + * @{ + */ + +/** + * First @c uint16_t value reserved for application-defined status codes on the wire. + * Values in the range @c [LIPC_STATUS_APP_BASE, 0xFFFF] are not assigned by this library; + * meaning (success, error, or sub-codes) is defined by the application protocol. + * @ref lipc_status_is_ok is true only for @ref LIPC_OK (0); application codes are never + * treated as success by that helper unless the application maps success to 0. + */ +#define LIPC_STATUS_APP_BASE 0x8000u + +#if defined(_WIN32) +# define LIPC_API __declspec(dllexport) +#elif defined(__GNUC__) && (__GNUC__ * 100 + __GNUC_MINOR__) >= 303 +# define LIPC_API __attribute__((visibility("default"))) +#else +# define LIPC_API +#endif + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Wire status / result code: a @c uint16_t field in the header (big-endian on the wire). + * + * Library-defined values are @c 0 … @c 12. @ref LIPC_INCOMPLETE is parser-only (need more + * bytes); it must not be sent as a packet status on the wire. + * + * Codes @c >= @ref LIPC_STATUS_APP_BASE (@c 0x8000) are reserved for application-specific use + * (see macro documentation). They are not enumerated here so applications can define their + * own constants without colliding with future library extensions below @c 0x8000. + */ +typedef enum lipc_status { + LIPC_OK = 0, + LIPC_BAD_HEADER = 1, + LIPC_BAD_MAGIC = 2, + LIPC_BAD_VERSION = 3, + LIPC_BAD_CHECKSUM = 4, + LIPC_BAD_LENGTH = 5, + LIPC_UNKNOWN_REQUEST = 6, + LIPC_INVALID_PARAMS = 7, + LIPC_NOT_FOUND = 8, + LIPC_INTERNAL_ERROR = 9, + LIPC_WOULD_BLOCK = 10, + LIPC_IO_ERROR = 11, + /** Parser: need more bytes before a full frame is available. */ + LIPC_INCOMPLETE = 12, +} lipc_status; + +/** @return Non-zero only if @p s is @ref LIPC_OK (application-defined codes are never “ok” here). */ +LIPC_API int lipc_status_is_ok(lipc_status s); + +/** + * Short debug label for @p s (never NULL). + * Library codes return an upper-snake token; @c s @c >= @ref LIPC_STATUS_APP_BASE yields + * @c "APPLICATION_DEFINED"; other values yield @c "UNKNOWN_LIPC_STATUS". + */ +LIPC_API const char *lipc_status_str(lipc_status s); + +/** @} */ + +/** @addtogroup lipc_packet + * @{ + */ + +/** Fixed header fields in host byte order (payload is separate; see file comment). */ +typedef struct lipc_header { + uint32_t magic; /**< Decoded wire magic; always @ref LIPC_PROTOCOL_MAGIC on valid packets. */ + uint16_t version; /**< Decoded wire version; always @ref LIPC_PROTOCOL_VERSION on valid packets. */ + uint16_t request; /**< Request/notification identifier used for table dispatch. */ + uint16_t status; /**< Wire status code (library or application defined). */ + uint16_t index; /**< Optional 16-bit scalar argument (method-specific semantics). */ + uint32_t tid; /**< Transaction id; 0 means notification/push, non-zero means RPC. */ + uint32_t length; /**< Payload length in bytes. */ + uint32_t checksum; /**< CRC32 from the wire image. */ + uint64_t value; /**< Optional 64-bit scalar argument (method-specific semantics). */ +} lipc_header; + +/** + * Encode @p h into exactly @ref LIPC_HEADER_SIZE bytes. + * Writes @ref LIPC_PROTOCOL_MAGIC and @ref LIPC_PROTOCOL_VERSION ( @p h->magic / @p h->version + * are not used for those wire fields). Sets the checksum field in @p wire to zero; use + * @ref lipc_checksum_compute then @ref lipc_header_set_checksum before sending. + */ +LIPC_API void lipc_header_encode(uint8_t wire[LIPC_HEADER_SIZE], const lipc_header *h); + +/** + * Decode wire header into @p h. Validates magic, version (@ref LIPC_PROTOCOL_VERSION), + * and @c length <= @p max_payload. + */ +LIPC_API lipc_status lipc_header_decode(lipc_header *h, const uint8_t wire[LIPC_HEADER_SIZE], + uint32_t max_payload); + +/** Write big-endian checksum into @p wire at @ref LIPC_OFF_CHECKSUM. */ +LIPC_API void lipc_header_set_checksum(uint8_t wire[LIPC_HEADER_SIZE], uint32_t crc); + +/** + * Zlib-compatible CRC-32 (reflected, poly 0xEDB88320), same as Python @c zlib.crc32. + * Covers @p wire (checksum field must already be zero) followed by @p payload. + */ +LIPC_API uint32_t lipc_checksum_compute(const uint8_t wire[LIPC_HEADER_SIZE], + const void *payload, size_t payload_len); + +/** + * Verify checksum for a received header + payload. + * @return @ref LIPC_OK or @ref LIPC_BAD_CHECKSUM. + */ +LIPC_API lipc_status lipc_checksum_verify(const uint8_t wire[LIPC_HEADER_SIZE], + const void *payload, size_t payload_len); + +/** + * Prepare an outgoing frame: encode header with checksum 0, compute CRC, patch checksum. + * @p payload may be NULL if @p payload_len is 0. + */ +LIPC_API void lipc_frame_fill_checksum(uint8_t wire[LIPC_HEADER_SIZE], const lipc_header *h, + const void *payload, size_t payload_len); + +/** + * Read one complete frame from @p fd (blocking partial reads until one full frame). + * Suitable when @p fd is blocking: a non-blocking socket can return @ref LIPC_WOULD_BLOCK after + * a partial header/payload and lose sync — use @ref lipc_frame_reader_read instead. + * On success, @p wire contains the header and @p payload_buf holds @p *payload_len bytes. + * @return @ref LIPC_OK, @ref LIPC_IO_ERROR (check @c errno), @ref LIPC_WOULD_BLOCK for + * @c EAGAIN / @c EWOULDBLOCK, or another validation status. + */ +LIPC_API lipc_status lipc_frame_read(int fd, uint32_t max_payload, + uint8_t wire[LIPC_HEADER_SIZE], lipc_header *h, + void *payload_buf, size_t payload_cap, size_t *payload_len); + +/** + * Write one complete frame to @p fd (header + payload). Fills checksum via @ref lipc_frame_fill_checksum. + * Uses a blocking write loop; for non-blocking @p fd use @ref lipc_frame_writer_write. + * @return @ref LIPC_OK, @ref LIPC_IO_ERROR (check @c errno), @ref LIPC_WOULD_BLOCK for + * @c EAGAIN / @c EWOULDBLOCK, or @ref LIPC_BAD_LENGTH if @c h->length does not match @p payload_len. + */ +LIPC_API lipc_status lipc_frame_write(int fd, lipc_header *h, + const void *payload, size_t payload_len); + +/** Streaming receive state (one frame at a time). */ +typedef struct lipc_stream_rx { + uint8_t header[LIPC_HEADER_SIZE]; /**< Partially collected header bytes. */ + size_t header_got; /**< Number of bytes currently in @ref header. */ + uint32_t payload_len; /**< Payload length of the frame currently being assembled. */ + size_t payload_got; /**< Number of payload bytes collected so far. */ + int in_payload; /**< 0 = filling header, non-zero = filling payload */ +} lipc_stream_rx; + +/** Reset @p rx to its initial state before first use or after error recovery. */ +LIPC_API void lipc_stream_rx_init(lipc_stream_rx *rx); + +/** + * Feed incoming stream bytes. Consumes from @p data starting at @p *data_off (updated on success). + * When @p *frame_done is non-zero, @p out_hdr and @p payload_buf (first @p *payload_len bytes) are valid. + * @return @ref LIPC_OK when a frame is complete, @ref LIPC_INCOMPLETE when more data is needed, + * or another error status. + */ +LIPC_API lipc_status lipc_stream_rx_feed(lipc_stream_rx *rx, uint32_t max_payload, + const uint8_t *data, size_t data_len, size_t *data_off, + lipc_header *out_hdr, uint8_t *payload_buf, size_t payload_cap, size_t *payload_len, + int *frame_done); + +/** Internal buffer size for @ref lipc_frame_reader (must fit a stashed tail after @p scratch). */ +#define LIPC_FRAME_READER_PENDING_CAP 8192u + +/** Stream-safe framed read state (non-blocking or mixed). */ +typedef struct lipc_frame_reader { + lipc_stream_rx stream; /**< Incremental decode state machine. */ + uint8_t pending[LIPC_FRAME_READER_PENDING_CAP]; /**< Buffered bytes left after a frame boundary. */ + size_t pending_len; /**< Number of valid bytes in @ref pending. */ +} lipc_frame_reader; + +/** Initialize @p r for use with @ref lipc_frame_reader_read. */ +LIPC_API void lipc_frame_reader_init(lipc_frame_reader *r); + +/** + * Read one full frame, preserving partial progress across @c EAGAIN. + * @p scratch_len must be <= @ref LIPC_FRAME_READER_PENDING_CAP. + * If @p wire is non-NULL, fills it with the validated on-wire header (including checksum). + */ +LIPC_API lipc_status lipc_frame_reader_read(int fd, lipc_frame_reader *r, uint32_t max_payload, + uint8_t *scratch, size_t scratch_len, + uint8_t *wire, lipc_header *h, + void *payload_buf, size_t payload_cap, size_t *payload_len); + +/** Incremental framed write (non-blocking safe). */ +typedef struct lipc_frame_writer { + uint8_t hdr[LIPC_HEADER_SIZE]; /**< Encoded header with finalized checksum. */ + const uint8_t *payload; /**< Borrowed payload pointer provided to @ref lipc_frame_writer_begin. */ + size_t payload_len; /**< Payload size in bytes. */ + size_t sent; /**< Bytes already written across header+payload. */ + size_t total; /**< Total bytes to write (header + payload). */ +} lipc_frame_writer; + +/** Validates @c h->length == @p payload_len, fills @p w. Checksum is computed once here. */ +LIPC_API lipc_status lipc_frame_writer_begin(lipc_frame_writer *w, const lipc_header *h, + const void *payload, size_t payload_len); + +/** + * Send more bytes of the current frame. Call until it returns @ref LIPC_OK. + * @return @ref LIPC_OK when finished, @ref LIPC_WOULD_BLOCK if the last @c write would block, + * or another status on failure (@c errno set when @ref LIPC_IO_ERROR). + */ +LIPC_API lipc_status lipc_frame_writer_write(int fd, lipc_frame_writer *w); + +/** @} */ + +/** + * @defgroup lipc_dispatch Method Validation and Dispatch Tables + * Declarative method descriptors shared by server and client routing. + * @{ + */ + +/* -------------------------------------------------------------------------- */ +/* Method descriptors, server dispatch, client notify vs reply routing */ +/* -------------------------------------------------------------------------- */ + +/** + * How @ref lipc_method_desc treats @c index, @c value, and payload length on the wire. + * + * - @ref LIPC_FIELD_FORBIDDEN — @c index / @c value must be 0; payload length must be 0. + * - @ref LIPC_FIELD_OPTIONAL — any value allowed; payload must still satisfy + * @c payload_min_len / @c payload_max_len (use 0 and @c UINT32_MAX-style caps as needed). + * - @ref LIPC_FIELD_REQUIRED — @c index / @c value must be non-zero; payload length must be within + * @c payload_min_len … @c payload_max_len inclusive (set bounds for “empty but required” as min=0 + * if that is valid for your method). + */ +typedef enum lipc_field_rule { + LIPC_FIELD_FORBIDDEN = 0, + LIPC_FIELD_OPTIONAL = 1, + LIPC_FIELD_REQUIRED = 2, +} lipc_field_rule; + +/** + * Per-method validation for incoming headers + payload length. + * For @c payload, @c payload_max_len is clamped by the caller’s frame cap (e.g. @ref LIPC_MAX_PAYLOAD_DEFAULT). + */ +typedef struct lipc_method_desc { + lipc_field_rule index; /**< Rule applied to @ref lipc_header.index. */ + lipc_field_rule value; /**< Rule applied to @ref lipc_header.value. */ + lipc_field_rule payload; /**< Rule applied to payload length/presence. */ + uint32_t payload_min_len; /**< Inclusive minimum payload length when payload is optional/required. */ + uint32_t payload_max_len; /**< Inclusive maximum payload length when payload is optional/required. */ +} lipc_method_desc; + +/** + * Validate @p h and @p payload_len against @p desc. + * @return @ref LIPC_OK or @ref LIPC_INVALID_PARAMS. + */ +LIPC_API lipc_status lipc_method_validate(const lipc_method_desc *desc, const lipc_header *h, + size_t payload_len); + +/** Server reply context: set by the library before your handler runs; pass to @ref lipc_server_reply. */ +typedef struct lipc_server_reply_ctx { + int fd; /**< Connected client fd to which replies should be sent. */ + lipc_header request_hdr; /**< Original decoded request header associated with this callback. */ +} lipc_server_reply_ctx; + +/** Opaque server context owned by liblocalipc. */ +typedef struct lipc_server lipc_server; + +/** + * Server callback invoked by @ref lipc_server_dispatch and the run loop. + * + * The callback executes on the server I/O thread (the thread running @ref lipc_server_run), + * so it should avoid long blocking work. @p payload points to a library-owned receive buffer and + * is only valid for the duration of the callback. + */ +typedef lipc_status (*lipc_server_fn)(void *user, lipc_server_reply_ctx *reply, + const lipc_header *req_hdr, const uint8_t *payload, size_t payload_len); + +/** One server registration: @c request_id, validation for the incoming request, handler. */ +typedef struct lipc_server_handler { + uint16_t request_id; /**< Request id matched against @ref lipc_header.request. */ + const lipc_method_desc *request_desc; /**< Validation contract for request fields/payload. */ + lipc_server_fn fn; /**< Handler callback for matched requests. */ +} lipc_server_handler; + +/** + * @defgroup lipc_server Server API + * Server lifecycle, run loop control, and request dispatch helpers. + * @{ + */ + +/** + * Create a server context around an already-bound/listening UNIX stream socket @p listen_fd. + * + * Ownership/lifetime: + * - Ownership of @p listen_fd transfers to the returned server on success. + * - On success, @p handlers is copied internally; caller may free/reuse its original array. + * - Returns NULL on allocation/setup failure; in that case @p listen_fd remains caller-owned. + * + * Threading: + * - Safe before @ref lipc_server_run starts. + * - Do not call concurrently with @ref lipc_server_destroy. + * + * Error reporting: + * - Returns NULL on failure and may leave @c errno set by allocation or OS calls. + */ +LIPC_API lipc_server *lipc_server_create(int listen_fd, uint32_t max_payload, + const lipc_server_handler *handlers, size_t nhandlers, void *handler_user); + +/** + * Destroy a server and release all owned resources. + * + * This closes all connected client fds, wake-pipe fds, and the owned listening fd. + * Passing NULL is a no-op. + * + * Threading: + * - Not safe concurrently with @ref lipc_server_run. + * - Stop the run loop and join your run thread before destroy. + */ +LIPC_API void lipc_server_destroy(lipc_server *server); + +/** + * Wake a running @ref lipc_server_run poll loop from any thread. + * + * @return @ref LIPC_OK on success (including already-awake pipe), @ref LIPC_IO_ERROR on wake-fd + * write failure, or @ref LIPC_BAD_HEADER if @p server is invalid/uninitialized. + * @note When @ref LIPC_IO_ERROR is returned, @c errno is set by @c write. + */ +LIPC_API lipc_status lipc_server_wake(lipc_server *server); + +/** + * Request shutdown and wake @ref lipc_server_run from any thread. + * + * The run loop exits with @ref LIPC_OK once it observes the stop request. + * This function is idempotent. + */ +LIPC_API lipc_status lipc_server_stop(lipc_server *server); + +/** + * Blocking server loop: poll listener + connected clients + wake fd, perform framing/dispatch. + * Returns @ref LIPC_OK after @ref lipc_server_stop, or an error status on fatal setup/runtime failure. + * + * Threading: + * - Exactly one thread may execute @ref lipc_server_run per server instance. + * - Handler callbacks execute on the @ref lipc_server_run thread. + * - @ref lipc_server_wake and @ref lipc_server_stop are safe from other threads. + * + * Error/IO: + * - Returns @ref LIPC_IO_ERROR for fatal @c poll/@c accept/read/write failures. + * - For per-client protocol failures (bad checksum/header/etc.), the client connection is dropped + * and the loop continues serving other clients. + */ +LIPC_API lipc_status lipc_server_run(lipc_server *server); + +/** + * Send one response frame echoing request @c tid and @c request from @p reply->request_hdr. + * + * Ownership: + * - @p payload is borrowed for the duration of the call only. + * + * Error mapping: + * - Returns @ref LIPC_BAD_LENGTH when payload cannot be represented in @c uint32_t length. + * - Returns @ref LIPC_IO_ERROR/@ref LIPC_WOULD_BLOCK from underlying framed write operations. + * - On @ref LIPC_IO_ERROR, @c errno is set by underlying write calls. + */ +LIPC_API lipc_status lipc_server_reply(lipc_server_reply_ctx *reply, uint16_t wire_status, + uint16_t index, uint64_t value, const void *payload, size_t payload_len); + +/** + * Find a handler for @p request_id in @p handlers (linear search; first match wins). + * @return Pointer into @p handlers, or NULL when no match exists. + */ +LIPC_API const lipc_server_handler *lipc_server_handler_lookup(const lipc_server_handler *handlers, + size_t nhandlers, uint16_t request_id); + +/** + * Handle one validated request: rejects client @c tid == 0 (@ref LIPC_INVALID_PARAMS), looks up handler, + * validates fields with @c request_desc, then calls the handler. The handler should call @ref lipc_server_reply. + * @return Status from validation / lookup (@ref LIPC_UNKNOWN_REQUEST, etc.) or the handler’s return value. + */ +LIPC_API lipc_status lipc_server_dispatch(const lipc_server_handler *handlers, size_t nhandlers, + void *user, int fd, const lipc_header *req_hdr, const uint8_t *payload, size_t payload_len); + +/** @} */ + +/** + * @defgroup lipc_client Client API + * Client lifecycle, routing tables, request submission, and synchronous/asynchronous receive paths. + * @{ + */ + +/** + * Client callback for reply-table dispatch when @c hdr->tid != 0. + * + * @param user User pointer from @ref lipc_client_create route table registration. + * @param hdr Decoded header for the incoming reply. + * @param payload Borrowed pointer to payload bytes (valid only for callback duration). + * @param payload_len Number of bytes at @p payload. + * @return @ref LIPC_OK to indicate successful handling, or a domain-specific status. + */ +typedef lipc_status (*lipc_client_reply_cb)(void *user, const lipc_header *hdr, const uint8_t *payload, + size_t payload_len); + +/** + * Client callback for notification-table dispatch when @c hdr->tid == 0. + * + * The same lifetime and return-value rules as @ref lipc_client_reply_cb apply. + */ +typedef lipc_status (*lipc_client_notify_cb)(void *user, const lipc_header *hdr, const uint8_t *payload, + size_t payload_len); + +/** Opaque client context owned by liblocalipc. */ +typedef struct lipc_client lipc_client; + +/** + * Per-request completion callback for @ref lipc_client_request_async. + * + * It is invoked exactly once for a matched reply or never if the request fails before send. + * The callback executes on the thread calling @ref lipc_client_dispatch_frame, + * @ref lipc_client_poll_once, or @ref lipc_client_call. + */ +typedef lipc_status (*lipc_client_completion_cb)(void *user, uint32_t tid, + const lipc_header *hdr, const uint8_t *payload, size_t payload_len); + +/** Client-side handler for RPC replies (@c tid != 0). */ +typedef struct lipc_client_reply_handler { + uint16_t request_id; /**< Reply request id to match against @ref lipc_header.request. */ + const lipc_method_desc *reply_desc; /**< Validation contract for matched replies. */ + lipc_client_reply_cb on_reply; /**< Reply callback invoked after validation. */ +} lipc_client_reply_handler; + +/** Client-side handler for server pushes (@c tid == 0). */ +typedef struct lipc_client_notify_handler { + uint16_t request_id; /**< Notification request id to match. */ + const lipc_method_desc *notify_desc; /**< Validation contract for matched notifications. */ + lipc_client_notify_cb on_notify; /**< Notification callback invoked after validation. */ +} lipc_client_notify_handler; + +/** Route table copied at @ref lipc_client_create time. */ +typedef struct lipc_client_route_table { + const lipc_client_reply_handler *reply_handlers; /**< Reply handlers array (may be NULL). */ + size_t n_reply; /**< Entries in @ref reply_handlers. */ + const lipc_client_notify_handler *notify_handlers; /**< Notification handlers array (may be NULL). */ + size_t n_notify; /**< Entries in @ref notify_handlers. */ +} lipc_client_route_table; + +/** Find a reply route by request id; returns NULL when not present. */ +LIPC_API const lipc_client_reply_handler *lipc_client_reply_lookup( + const lipc_client_reply_handler *handlers, size_t nhandlers, uint16_t request_id); + +/** Find a notify route by request id; returns NULL when not present. */ +LIPC_API const lipc_client_notify_handler *lipc_client_notify_lookup( + const lipc_client_notify_handler *handlers, size_t nhandlers, uint16_t request_id); + +/** + * Route one incoming client-side frame after decode + CRC. + * - @c hdr->tid == 0: dispatch to @c notify_handlers by @c hdr->request; @ref LIPC_UNKNOWN_REQUEST if unlisted. + * - @c hdr->tid != 0: dispatch to @c reply_handlers by @c hdr->request; @ref LIPC_UNKNOWN_REQUEST if unlisted. + * Validates against the matching descriptor before invoking the callback. + */ +LIPC_API lipc_status lipc_client_dispatch_incoming(const lipc_client_route_table *routes, void *user, + const lipc_header *hdr, const uint8_t *payload, size_t payload_len); + +/** + * Create a client context with an internal non-zero TID allocator and in-flight table. + * @p max_payload of 0 uses @ref LIPC_MAX_PAYLOAD_DEFAULT. + * + * Ownership/lifetime: + * - The route arrays referenced by @p routes are copied; caller retains ownership of originals. + * - Returned client must be released with @ref lipc_client_destroy. + * + * Threading: + * - Safe from any thread before the client is published to other threads. + * + * Error reporting: + * - Returns NULL on allocation/setup failure. + */ +LIPC_API lipc_client *lipc_client_create(uint32_t max_payload, + const lipc_client_route_table *routes, void *route_user); + +/** + * Free client resources and any in-flight entries. + * + * Threading: + * - Not safe concurrently with other client APIs. + * - Ensure no other thread is inside client APIs before destroy. + * + * Passing NULL is a no-op. + */ +LIPC_API void lipc_client_destroy(lipc_client *client); + +/** + * Allocate a non-zero transaction ID that is not currently in-flight. + * + * Threading: + * - Safe concurrently with @ref lipc_client_request_async / dispatch APIs. + * + * @return @ref LIPC_OK on success, @ref LIPC_BAD_HEADER for invalid args, or + * @ref LIPC_INTERNAL_ERROR if no free TID can be allocated. + */ +LIPC_API lipc_status lipc_client_next_tid(lipc_client *client, uint32_t *out_tid); + +/** + * Send a client request with an internally allocated non-zero TID. + * The completion callback is keyed by the allocated TID and invoked when the matching reply arrives. + * + * Threading: + * - Safe to call concurrently from multiple threads. + * - Completion callbacks run on the thread that calls @ref lipc_client_dispatch_frame / + * @ref lipc_client_poll_once / @ref lipc_client_call. + * + * Ownership/lifetime: + * - @p payload is borrowed for the duration of this call only. + * - On success, completion ownership transfers to the internal in-flight table until reply/cleanup. + * + * Error/IO: + * - Returns @ref LIPC_BAD_LENGTH when @p payload_len cannot fit in wire length. + * - Returns @ref LIPC_IO_ERROR/@ref LIPC_WOULD_BLOCK from socket write path. + * - On @ref LIPC_IO_ERROR, @c errno is set by underlying write. + */ +LIPC_API lipc_status lipc_client_request_async(lipc_client *client, int fd, + uint16_t request_id, uint16_t index, uint64_t value, + const void *payload, size_t payload_len, + lipc_client_completion_cb on_complete, void *complete_user, + uint32_t *out_tid); + +/** + * Route one decoded frame through in-flight tracking: + * - @c tid == 0: notification path via @ref lipc_client_dispatch_incoming. + * - @c tid != 0: resolve by in-flight TID and invoke that request's completion callback. + * + * Threading: + * - Safe concurrently with @ref lipc_client_request_async and @ref lipc_client_next_tid. + * + * @return @ref LIPC_NOT_FOUND when reply TID has no matching in-flight request. + */ +LIPC_API lipc_status lipc_client_dispatch_frame(lipc_client *client, + const lipc_header *hdr, const uint8_t *payload, size_t payload_len); + +/** + * Read and dispatch exactly one frame from @p fd. + * Returns @ref LIPC_WOULD_BLOCK for non-blocking fds with no full frame ready. + * + * Threading: + * - Serialized per-client internally; concurrent callers are allowed but one frame is + * read/decoded at a time per client context. + * + * Error/IO: + * - Returns protocol validation statuses for malformed frames. + * - Returns @ref LIPC_IO_ERROR for read failures (with @c errno set by OS read/poll path). + */ +LIPC_API lipc_status lipc_client_poll_once(lipc_client *client, int fd); + +/** + * Synchronous helper: sends a request with an internal TID and blocks until its matching reply. + * While waiting, unrelated notifications are still dispatched. + * + * @p reply_payload may be NULL only if @p reply_payload_cap is 0. + * @p reply_hdr may be NULL when the caller only needs payload bytes. + * + * Blocking/threading: + * - Blocks the calling thread until matching reply or error. + * - Safe concurrently with other request-producing APIs on the same client. + * + * Error/IO: + * - Returns @ref LIPC_BAD_LENGTH if a matched reply payload exceeds @p reply_payload_cap. + * - Returns @ref LIPC_IO_ERROR for fatal socket polling/read failures (with @c errno set). + */ +LIPC_API lipc_status lipc_client_call(lipc_client *client, int fd, + uint16_t request_id, uint16_t index, uint64_t value, + const void *payload, size_t payload_len, + lipc_header *reply_hdr, void *reply_payload, size_t reply_payload_cap, size_t *reply_len); + +/** @} */ +/** @} */ + +/** @addtogroup lipc_socket_helpers + * @{ + */ + +/* -------------------------------------------------------------------------- */ +/* UNIX socket helper utilities */ +/* -------------------------------------------------------------------------- */ + +/** Canonical default UNIX socket path used by localipc helpers/examples. */ +LIPC_API const char *lipc_default_socket_path(void); + +/** + * Initialize a UNIX sockaddr from @p path and optionally return its length in @p len. + * Does not create any file descriptor. + * + * @param path NUL-terminated filesystem path. + * @param addr Output structure to fill. + * @param len Optional output length for @c bind/@c connect. + * @return @ref LIPC_OK, @ref LIPC_INVALID_PARAMS for bad input, or @ref LIPC_BAD_LENGTH. + * @note On failure, @c errno is set to @c EINVAL or @c ENAMETOOLONG. + */ +LIPC_API lipc_status lipc_sockaddr_init(const char *path, struct sockaddr_un *addr, socklen_t *len); + +/** + * Set or clear non-blocking mode on @p fd. + * @return @ref LIPC_OK, @ref LIPC_INVALID_PARAMS, or @ref LIPC_IO_ERROR (@c errno preserved). + */ +LIPC_API lipc_status lipc_socket_set_nonblocking(int fd, int enabled); + +/** + * Set or clear close-on-exec on @p fd. + * @return @ref LIPC_OK, @ref LIPC_INVALID_PARAMS, or @ref LIPC_IO_ERROR (@c errno preserved). + */ +LIPC_API lipc_status lipc_socket_set_cloexec(int fd, int enabled); + +/** + * Create+bind+listen a UNIX stream socket at @p path. + * @param flags Bitwise OR of @ref LIPC_SOCKET_NONBLOCK and @ref LIPC_SOCKET_CLOEXEC. + * @param backlog Positive backlog passed to @c listen. + * @return listening fd on success, -1 on error (@c errno set). + * + * Ownership: + * - Caller owns the returned fd and must close it. + * - Existing socket path is unlinked before bind. + */ +LIPC_API int lipc_socket_listen(const char *path, int backlog, unsigned int flags); + +/** + * Create+connect a UNIX stream socket to @p path. + * @param flags Bitwise OR of @ref LIPC_SOCKET_NONBLOCK and @ref LIPC_SOCKET_CLOEXEC. + * @return connected fd on success, -1 on error (@c errno set). + * + * Ownership: + * - Caller owns the returned fd and must close it. + * - In non-blocking mode, @c EINPROGRESS/@c EALREADY are treated as success with a + * still-connecting fd return value. + */ +LIPC_API int lipc_socket_connect(const char *path, unsigned int flags); + +/** @} */ + +#ifdef __cplusplus +} +#endif + +#endif /* LOCALIPC_LIPC_H */ diff --git a/subprojects/localipc/lipc_crc32.c b/subprojects/localipc/lipc_crc32.c new file mode 100644 index 0000000..de4f1cc --- /dev/null +++ b/subprojects/localipc/lipc_crc32.c @@ -0,0 +1,75 @@ +/* + * Zlib-compatible CRC-32 for localipc (reflect, poly 0xEDB88320). + * Table matches daemon/crc32.c and Python zlib.crc32. + * + * SPDX-License-Identifier: GPL-2.0-only WITH Classpath-exception-2.0 + */ + +#include +#include + +#define LIPC_CRC32_INIT 0u + +/* Table matches zlib's crc_table (reflect, poly 0xEDB88320). */ +static const uint32_t lipc_crc32_table[256] = { + 0x00000000u, 0x77073096u, 0xee0e612cu, 0x990951bau, 0x076dc419u, 0x706af48fu, 0xe963a535u, + 0x9e6495a3u, 0x0edb8832u, 0x79dcb8a4u, 0xe0d5e91eu, 0x97d2d988u, 0x09b64c2bu, 0x7eb17cbdu, + 0xe7b82d07u, 0x90bf1d91u, 0x1db71064u, 0x6ab020f2u, 0xf3b97148u, 0x84be41deu, 0x1adad47du, + 0x6ddde4ebu, 0xf4d4b551u, 0x83d385c7u, 0x136c9856u, 0x646ba8c0u, 0xfd62f97au, 0x8a65c9ecu, + 0x14015c4fu, 0x63066cd9u, 0xfa0f3d63u, 0x8d080df5u, 0x3b6e20c8u, 0x4c69105eu, 0xd56041e4u, + 0xa2677172u, 0x3c03e4d1u, 0x4b04d447u, 0xd20d85fdu, 0xa50ab56bu, 0x35b5a8fau, 0x42b2986cu, + 0xdbbbc9d6u, 0xacbcf940u, 0x32d86ce3u, 0x45df5c75u, 0xdcd60dcfu, 0xabd13d59u, 0x26d930acu, + 0x51de003au, 0xc8d75180u, 0xbfd06116u, 0x21b4f4b5u, 0x56b3c423u, 0xcfba9599u, 0xb8bda50fu, + 0x2802b89eu, 0x5f058808u, 0xc60cd9b2u, 0xb10be924u, 0x2f6f7c87u, 0x58684c11u, 0xc1611dabu, + 0xb6662d3du, 0x76dc4190u, 0x01db7106u, 0x98d220bcu, 0xefd5102au, 0x71b18589u, 0x06b6b51fu, + 0x9fbfe4a5u, 0xe8b8d433u, 0x7807c9a2u, 0x0f00f934u, 0x9609a88eu, 0xe10e9818u, 0x7f6a0dbbu, + 0x086d3d2du, 0x91646c97u, 0xe6635c01u, 0x6b6b51f4u, 0x1c6c6162u, 0x856530d8u, 0xf262004eu, + 0x6c0695edu, 0x1b01a57bu, 0x8208f4c1u, 0xf50fc457u, 0x65b0d9c6u, 0x12b7e950u, 0x8bbeb8eau, + 0xfcb9887cu, 0x62dd1ddfu, 0x15da2d49u, 0x8cd37cf3u, 0xfbd44c65u, 0x4db26158u, 0x3ab551ceu, + 0xa3bc0074u, 0xd4bb30e2u, 0x4adfa541u, 0x3dd895d7u, 0xa4d1c46du, 0xd3d6f4fbu, 0x4369e96au, + 0x346ed9fcu, 0xad678846u, 0xda60b8d0u, 0x44042d73u, 0x33031de5u, 0xaa0a4c5fu, 0xdd0d7cc9u, + 0x5005713cu, 0x270241aau, 0xbe0b1010u, 0xc90c2086u, 0x5768b525u, 0x206f85b3u, 0xb966d409u, + 0xce61e49fu, 0x5edef90eu, 0x29d9c998u, 0xb0d09822u, 0xc7d7a8b4u, 0x59b33d17u, 0x2eb40d81u, + 0xb7bd5c3bu, 0xc0ba6cadu, 0xedb88320u, 0x9abfb3b6u, 0x03b6e20cu, 0x74b1d29au, 0xead54739u, + 0x9dd277afu, 0x04db2615u, 0x73dc1683u, 0xe3630b12u, 0x94643b84u, 0x0d6d6a3eu, 0x7a6a5aa8u, + 0xe40ecf0bu, 0x9309ff9du, 0x0a00ae27u, 0x7d079eb1u, 0xf00f9344u, 0x8708a3d2u, 0x1e01f268u, + 0x6906c2feu, 0xf762575du, 0x806567cbu, 0x196c3671u, 0x6e6b06e7u, 0xfed41b76u, 0x89d32be0u, + 0x10da7a5au, 0x67dd4accu, 0xf9b9df6fu, 0x8ebeeff9u, 0x17b7be43u, 0x60b08ed5u, 0xd6d6a3e8u, + 0xa1d1937eu, 0x38d8c2c4u, 0x4fdff252u, 0xd1bb67f1u, 0xa6bc5767u, 0x3fb506ddu, 0x48b2364bu, + 0xd80d2bdau, 0xaf0a1b4cu, 0x36034af6u, 0x41047a60u, 0xdf60efc3u, 0xa867df55u, 0x316e8eefu, + 0x4669be79u, 0xcb61b38cu, 0xbc66831au, 0x256fd2a0u, 0x5268e236u, 0xcc0c7795u, 0xbb0b4703u, + 0x220216b9u, 0x5505262fu, 0xc5ba3bbeu, 0xb2bd0b28u, 0x2bb45a92u, 0x5cb36a04u, 0xc2d7ffa7u, + 0xb5d0cf31u, 0x2cd99e8bu, 0x5bdeae1du, 0x9b64c2b0u, 0xec63f226u, 0x756aa39cu, 0x026d930au, + 0x9c0906a9u, 0xeb0e363fu, 0x72076785u, 0x05005713u, 0x95bf4a82u, 0xe2b87a14u, 0x7bb12baeu, + 0x0cb61b38u, 0x92d28e9bu, 0xe5d5be0du, 0x7cdcefb7u, 0x0bdbdf21u, 0x86d3d2d4u, 0xf1d4e242u, + 0x68ddb3f8u, 0x1fda836eu, 0x81be16cdu, 0xf6b9265bu, 0x6fb077e1u, 0x18b74777u, 0x88085ae6u, + 0xff0f6a70u, 0x66063bcau, 0x11010b5cu, 0x8f659effu, 0xf862ae69u, 0x616bffd3u, 0x166ccf45u, + 0xa00ae278u, 0xd70dd2eeu, 0x4e048354u, 0x3903b3c2u, 0xa7672661u, 0xd06016f7u, 0x4969474du, + 0x3e6e77dbu, 0xaed16a4au, 0xd9d65adcu, 0x40df0b66u, 0x37d83bf0u, 0xa9bcae53u, 0xdebb9ec5u, + 0x47b2cf7fu, 0x30b5ffe9u, 0xbdbdf21cu, 0xcabac28au, 0x53b39330u, 0x24b4a3a6u, 0xbad03605u, + 0xcdd70693u, 0x54de5729u, 0x23d967bfu, 0xb3667a2eu, 0xc4614ab8u, 0x5d681b02u, 0x2a6f2b94u, + 0xb40bbe37u, 0xc30c8ea1u, 0x5a05df1bu, 0x2d02ef8du, +}; + +uint32_t lipc_crc32_update(uint32_t crc, const void *data, size_t len) +{ + const unsigned char *p = data; + + if (len == 0) { + return crc; + } + + crc = ~crc; + + while (len != 0) { + crc = lipc_crc32_table[(crc ^ *p++) & 0xffu] ^ (crc >> 8); + len--; + } + + return ~crc; +} + +uint32_t lipc_crc32_digest(const void *data, size_t len) +{ + return lipc_crc32_update(LIPC_CRC32_INIT, data, len); +} diff --git a/subprojects/localipc/lipc_dispatch.c b/subprojects/localipc/lipc_dispatch.c new file mode 100644 index 0000000..6fef88a --- /dev/null +++ b/subprojects/localipc/lipc_dispatch.c @@ -0,0 +1,1095 @@ +/* + * Method descriptors, server dispatch, client routing. + * + * SPDX-License-Identifier: GPL-2.0-only WITH Classpath-exception-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +typedef struct lipc_client_inflight { + uint32_t tid; + uint16_t request_id; + lipc_client_completion_cb on_complete; + void *complete_user; + struct lipc_client_inflight *next; +} lipc_client_inflight; + +struct lipc_client { + pthread_mutex_t lock; + pthread_mutex_t io_lock; + pthread_rwlock_t routes_lock; + uint32_t next_tid; + uint32_t max_payload; + lipc_frame_reader reader; + uint8_t *rx_payload; + lipc_client_route_table routes; + lipc_client_reply_handler *reply_handlers; + lipc_client_notify_handler *notify_handlers; + void *route_user; + lipc_client_inflight *inflight; +}; + +typedef struct lipc_server_client { + int fd; + lipc_frame_reader reader; + uint8_t *rx_payload; +} lipc_server_client; + +struct lipc_server { + pthread_mutex_t lock; + pthread_rwlock_t handlers_lock; + int listen_fd; + int wake_rd; + int wake_wr; + int stop_requested; + uint32_t max_payload; + lipc_server_handler *handlers; + size_t nhandlers; + void *handler_user; + lipc_server_client *clients; + size_t nclients; + size_t clients_cap; +}; + +static void lipc_server_client_close(lipc_server_client *client) +{ + if (!client) { + return; + } + if (client->fd >= 0) { + close(client->fd); + client->fd = -1; + } + free(client->rx_payload); + client->rx_payload = NULL; +} + +static void lipc_server_remove_client(lipc_server *server, size_t idx) +{ + if (!server || idx >= server->nclients) { + return; + } + lipc_server_client_close(&server->clients[idx]); + if (idx + 1 < server->nclients) { + server->clients[idx] = server->clients[server->nclients - 1]; + } + server->nclients--; +} + +static int lipc_server_find_client_index(const lipc_server *server, int fd, size_t *out_idx) +{ + if (!server || !out_idx) { + return 0; + } + for (size_t i = 0; i < server->nclients; i++) { + if (server->clients[i].fd == fd) { + *out_idx = i; + return 1; + } + } + return 0; +} + +static lipc_status lipc_server_add_client(lipc_server *server, int fd) +{ + lipc_server_client client; + lipc_server_client *new_clients; + + if (!server || fd < 0) { + return LIPC_BAD_HEADER; + } + if (lipc_socket_set_nonblocking(fd, 1) != LIPC_OK) { + return LIPC_IO_ERROR; + } + + memset(&client, 0, sizeof client); + client.fd = fd; + lipc_frame_reader_init(&client.reader); + if (server->max_payload > 0) { + client.rx_payload = malloc(server->max_payload); + if (!client.rx_payload) { + return LIPC_INTERNAL_ERROR; + } + } + + if (server->nclients == server->clients_cap) { + size_t new_cap = server->clients_cap == 0 ? 8 : server->clients_cap * 2; + new_clients = realloc(server->clients, new_cap * sizeof *new_clients); + if (!new_clients) { + free(client.rx_payload); + return LIPC_INTERNAL_ERROR; + } + server->clients = new_clients; + server->clients_cap = new_cap; + } + + server->clients[server->nclients++] = client; + return LIPC_OK; +} + +static void lipc_server_drain_wake_fd(int wake_fd) +{ + uint8_t buf[64]; + ssize_t n; + + do { + n = read(wake_fd, buf, sizeof buf); + } while (n > 0 || (n < 0 && errno == EINTR)); +} + +static lipc_status lipc_server_reply_error(int fd, const lipc_header *hdr, lipc_status st) +{ + lipc_server_reply_ctx reply; + + if (!hdr || hdr->tid == 0) { + return st; + } + reply.fd = fd; + reply.request_hdr = *hdr; + return lipc_server_reply(&reply, (uint16_t)st, 0, 0, NULL, 0); +} + +lipc_server *lipc_server_create(int listen_fd, uint32_t max_payload, + const lipc_server_handler *handlers, size_t nhandlers, void *handler_user) +{ + lipc_server *server; + int wake_pipe[2]; + + if (listen_fd < 0) { + return NULL; + } + + server = calloc(1, sizeof *server); + if (!server) { + return NULL; + } + + if (pthread_mutex_init(&server->lock, NULL) != 0) { + free(server); + return NULL; + } + if (pthread_rwlock_init(&server->handlers_lock, NULL) != 0) { + pthread_mutex_destroy(&server->lock); + free(server); + return NULL; + } + + if (pipe(wake_pipe) != 0) { + pthread_rwlock_destroy(&server->handlers_lock); + pthread_mutex_destroy(&server->lock); + free(server); + return NULL; + } + if (lipc_socket_set_nonblocking(wake_pipe[0], 1) != LIPC_OK + || lipc_socket_set_nonblocking(wake_pipe[1], 1) != LIPC_OK) { + close(wake_pipe[0]); + close(wake_pipe[1]); + pthread_rwlock_destroy(&server->handlers_lock); + pthread_mutex_destroy(&server->lock); + free(server); + return NULL; + } + + server->listen_fd = listen_fd; + server->wake_rd = wake_pipe[0]; + server->wake_wr = wake_pipe[1]; + server->max_payload = max_payload ? max_payload : LIPC_MAX_PAYLOAD_DEFAULT; + if (handlers && nhandlers > 0) { + server->handlers = calloc(nhandlers, sizeof *server->handlers); + if (!server->handlers) { + close(server->wake_rd); + close(server->wake_wr); + pthread_rwlock_destroy(&server->handlers_lock); + pthread_mutex_destroy(&server->lock); + free(server); + return NULL; + } + memcpy(server->handlers, handlers, nhandlers * sizeof *handlers); + server->nhandlers = nhandlers; + } + server->handler_user = handler_user; + return server; +} + +void lipc_server_destroy(lipc_server *server) +{ + if (!server) { + return; + } + + pthread_mutex_lock(&server->lock); + + for (size_t i = 0; i < server->nclients; i++) { + lipc_server_client_close(&server->clients[i]); + } + free(server->clients); + server->clients = NULL; + server->nclients = 0; + server->clients_cap = 0; + + if (server->listen_fd >= 0) { + close(server->listen_fd); + server->listen_fd = -1; + } + if (server->wake_rd >= 0) { + close(server->wake_rd); + server->wake_rd = -1; + } + if (server->wake_wr >= 0) { + close(server->wake_wr); + server->wake_wr = -1; + } + pthread_mutex_unlock(&server->lock); + + pthread_rwlock_wrlock(&server->handlers_lock); + free(server->handlers); + server->handlers = NULL; + server->nhandlers = 0; + server->handler_user = NULL; + pthread_rwlock_unlock(&server->handlers_lock); + + pthread_rwlock_destroy(&server->handlers_lock); + pthread_mutex_destroy(&server->lock); + free(server); +} + +lipc_status lipc_server_wake(lipc_server *server) +{ + uint8_t sig = 1; + int wake_wr = -1; + + if (!server) { + return LIPC_BAD_HEADER; + } + pthread_mutex_lock(&server->lock); + wake_wr = server->wake_wr; + pthread_mutex_unlock(&server->lock); + if (wake_wr < 0) { + return LIPC_BAD_HEADER; + } + + for (;;) { + ssize_t n = write(wake_wr, &sig, sizeof sig); + if (n == (ssize_t)sizeof sig) { + return LIPC_OK; + } + if (n < 0 && errno == EINTR) { + continue; + } + if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { + return LIPC_OK; + } + return LIPC_IO_ERROR; + } +} + +lipc_status lipc_server_stop(lipc_server *server) +{ + if (!server) { + return LIPC_BAD_HEADER; + } + + pthread_mutex_lock(&server->lock); + server->stop_requested = 1; + pthread_mutex_unlock(&server->lock); + return lipc_server_wake(server); +} + +static int lipc_server_is_stopped(lipc_server *server) +{ + int stopped; + + pthread_mutex_lock(&server->lock); + stopped = server->stop_requested; + pthread_mutex_unlock(&server->lock); + return stopped; +} + +static lipc_status lipc_server_accept_pending(lipc_server *server) +{ + for (;;) { + int cfd = accept(server->listen_fd, NULL, NULL); + if (cfd < 0) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return LIPC_OK; + } + return LIPC_IO_ERROR; + } + lipc_status st = lipc_server_add_client(server, cfd); + if (st != LIPC_OK) { + close(cfd); + return st; + } + } +} + +static lipc_status lipc_server_drain_client_frames(lipc_server *server, size_t idx) +{ + uint8_t scratch[4096]; + lipc_header hdr; + size_t payload_len = 0; + + if (!server || idx >= server->nclients) { + return LIPC_BAD_HEADER; + } + + for (;;) { + lipc_server_client *client = &server->clients[idx]; + const lipc_server_handler *handlers = NULL; + size_t nhandlers = 0; + void *handler_user = NULL; + lipc_status st = lipc_frame_reader_read(client->fd, &client->reader, server->max_payload, + scratch, sizeof scratch, NULL, &hdr, client->rx_payload, server->max_payload, + &payload_len); + if (st == LIPC_WOULD_BLOCK) { + return LIPC_OK; + } + if (st != LIPC_OK) { + return st; + } + + pthread_rwlock_rdlock(&server->handlers_lock); + handlers = server->handlers; + nhandlers = server->nhandlers; + handler_user = server->handler_user; + pthread_rwlock_unlock(&server->handlers_lock); + + st = lipc_server_dispatch(handlers, nhandlers, handler_user, + client->fd, &hdr, client->rx_payload, payload_len); + if (st != LIPC_OK) { + lipc_status reply_st = lipc_server_reply_error(client->fd, &hdr, st); + if (reply_st != LIPC_OK && reply_st != st) { + return reply_st; + } + } + } +} + +lipc_status lipc_server_run(lipc_server *server) +{ + int listen_fd; + int wake_rd; + + if (!server) { + return LIPC_BAD_HEADER; + } + pthread_mutex_lock(&server->lock); + listen_fd = server->listen_fd; + wake_rd = server->wake_rd; + server->stop_requested = 0; + pthread_mutex_unlock(&server->lock); + if (listen_fd < 0 || wake_rd < 0) { + return LIPC_BAD_HEADER; + } + if (lipc_socket_set_nonblocking(listen_fd, 1) != LIPC_OK) { + return LIPC_IO_ERROR; + } + + for (;;) { + struct pollfd *pfds = NULL; + size_t nfds = 0; + int prc; + lipc_status run_st = LIPC_OK; + + pthread_mutex_lock(&server->lock); + nfds = server->nclients + 2; + pfds = calloc(nfds, sizeof *pfds); + if (!pfds) { + pthread_mutex_unlock(&server->lock); + return LIPC_INTERNAL_ERROR; + } + + pfds[0].fd = server->wake_rd; + pfds[0].events = POLLIN; + pfds[1].fd = server->listen_fd; + pfds[1].events = POLLIN; + for (size_t i = 0; i < server->nclients; i++) { + pfds[i + 2].fd = server->clients[i].fd; + pfds[i + 2].events = POLLIN; + } + pthread_mutex_unlock(&server->lock); + + do { + prc = poll(pfds, nfds, -1); + } while (prc < 0 && errno == EINTR); + + if (prc < 0) { + free(pfds); + return LIPC_IO_ERROR; + } + + if ((pfds[0].revents & POLLIN) != 0) { + lipc_server_drain_wake_fd(wake_rd); + } + if (lipc_server_is_stopped(server)) { + free(pfds); + return LIPC_OK; + } + + if ((pfds[1].revents & (POLLIN | POLLERR | POLLHUP)) != 0) { + pthread_mutex_lock(&server->lock); + run_st = lipc_server_accept_pending(server); + pthread_mutex_unlock(&server->lock); + if (run_st != LIPC_OK) { + free(pfds); + return run_st; + } + } + + for (size_t i = 2; i < nfds; i++) { + int fd = pfds[i].fd; + short revents = pfds[i].revents; + size_t idx = 0; + if (fd < 0 || revents == 0) { + continue; + } + pthread_mutex_lock(&server->lock); + if (!lipc_server_find_client_index(server, fd, &idx)) { + pthread_mutex_unlock(&server->lock); + continue; + } + if ((revents & (POLLHUP | POLLERR | POLLNVAL)) != 0) { + lipc_server_remove_client(server, idx); + pthread_mutex_unlock(&server->lock); + continue; + } + if ((revents & POLLIN) == 0) { + pthread_mutex_unlock(&server->lock); + continue; + } + pthread_mutex_unlock(&server->lock); + + run_st = lipc_server_drain_client_frames(server, idx); + if (run_st == LIPC_OK) { + continue; + } + if (run_st == LIPC_IO_ERROR || run_st == LIPC_BAD_HEADER || run_st == LIPC_BAD_MAGIC + || run_st == LIPC_BAD_VERSION || run_st == LIPC_BAD_LENGTH + || run_st == LIPC_BAD_CHECKSUM) { + pthread_mutex_lock(&server->lock); + lipc_server_remove_client(server, idx); + pthread_mutex_unlock(&server->lock); + continue; + } + free(pfds); + return run_st; + } + + free(pfds); + } +} + +lipc_status lipc_method_validate(const lipc_method_desc *desc, const lipc_header *h, + size_t payload_len) +{ + if (!desc || !h) { + return LIPC_BAD_HEADER; + } + if (h->length != payload_len) { + return LIPC_INVALID_PARAMS; + } + + switch (desc->index) { + case LIPC_FIELD_FORBIDDEN: + if (h->index != 0) { + return LIPC_INVALID_PARAMS; + } + break; + case LIPC_FIELD_REQUIRED: + if (h->index == 0) { + return LIPC_INVALID_PARAMS; + } + break; + case LIPC_FIELD_OPTIONAL: + default: + break; + } + + switch (desc->value) { + case LIPC_FIELD_FORBIDDEN: + if (h->value != 0) { + return LIPC_INVALID_PARAMS; + } + break; + case LIPC_FIELD_REQUIRED: + if (h->value == 0) { + return LIPC_INVALID_PARAMS; + } + break; + case LIPC_FIELD_OPTIONAL: + default: + break; + } + + switch (desc->payload) { + case LIPC_FIELD_FORBIDDEN: + if (payload_len != 0) { + return LIPC_INVALID_PARAMS; + } + break; + case LIPC_FIELD_OPTIONAL: + case LIPC_FIELD_REQUIRED: + if (payload_len < desc->payload_min_len || payload_len > desc->payload_max_len) { + return LIPC_INVALID_PARAMS; + } + break; + default: + return LIPC_INVALID_PARAMS; + } + + return LIPC_OK; +} + +lipc_status lipc_server_reply(lipc_server_reply_ctx *reply, uint16_t wire_status, uint16_t index, + uint64_t value, const void *payload, size_t payload_len) +{ + if (!reply || reply->fd < 0) { + return LIPC_BAD_HEADER; + } + if (payload_len > 0 && !payload) { + return LIPC_BAD_HEADER; + } + + lipc_header out = { + .request = reply->request_hdr.request, + .status = wire_status, + .index = index, + .tid = reply->request_hdr.tid, + .length = (uint32_t)payload_len, + .value = value, + }; + if (out.length != payload_len) { + return LIPC_BAD_LENGTH; + } + return lipc_frame_write(reply->fd, &out, payload, payload_len); +} + +const lipc_server_handler *lipc_server_handler_lookup(const lipc_server_handler *handlers, + size_t nhandlers, uint16_t request_id) +{ + if (!handlers || nhandlers == 0) { + return NULL; + } + for (size_t i = 0; i < nhandlers; i++) { + if (handlers[i].request_id == request_id) { + return &handlers[i]; + } + } + return NULL; +} + +lipc_status lipc_server_dispatch(const lipc_server_handler *handlers, size_t nhandlers, void *user, + int fd, const lipc_header *req_hdr, const uint8_t *payload, size_t payload_len) +{ + if (!req_hdr || (payload_len > 0 && !payload)) { + return LIPC_BAD_HEADER; + } + if (req_hdr->tid == 0) { + return LIPC_INVALID_PARAMS; + } + + const lipc_server_handler *h = lipc_server_handler_lookup(handlers, nhandlers, req_hdr->request); + if (!h || !h->request_desc || !h->fn) { + return LIPC_UNKNOWN_REQUEST; + } + + lipc_status v = lipc_method_validate(h->request_desc, req_hdr, payload_len); + if (v != LIPC_OK) { + return v; + } + + lipc_server_reply_ctx ctx = { + .fd = fd, + }; + ctx.request_hdr = *req_hdr; + + return h->fn(user, &ctx, req_hdr, payload, payload_len); +} + +const lipc_client_reply_handler *lipc_client_reply_lookup(const lipc_client_reply_handler *handlers, + size_t nhandlers, uint16_t request_id) +{ + if (!handlers || nhandlers == 0) { + return NULL; + } + for (size_t i = 0; i < nhandlers; i++) { + if (handlers[i].request_id == request_id) { + return &handlers[i]; + } + } + return NULL; +} + +const lipc_client_notify_handler *lipc_client_notify_lookup(const lipc_client_notify_handler *handlers, + size_t nhandlers, uint16_t request_id) +{ + if (!handlers || nhandlers == 0) { + return NULL; + } + for (size_t i = 0; i < nhandlers; i++) { + if (handlers[i].request_id == request_id) { + return &handlers[i]; + } + } + return NULL; +} + +lipc_status lipc_client_dispatch_incoming(const lipc_client_route_table *routes, void *user, + const lipc_header *hdr, const uint8_t *payload, size_t payload_len) +{ + if (!routes || !hdr) { + return LIPC_BAD_HEADER; + } + if (payload_len > 0 && !payload) { + return LIPC_BAD_HEADER; + } + + if (hdr->tid == 0) { + const lipc_client_notify_handler *nh + = lipc_client_notify_lookup(routes->notify_handlers, routes->n_notify, hdr->request); + if (!nh || !nh->notify_desc || !nh->on_notify) { + return LIPC_UNKNOWN_REQUEST; + } + lipc_status v = lipc_method_validate(nh->notify_desc, hdr, payload_len); + if (v != LIPC_OK) { + return v; + } + return nh->on_notify(user, hdr, payload, payload_len); + } + + const lipc_client_reply_handler *rh + = lipc_client_reply_lookup(routes->reply_handlers, routes->n_reply, hdr->request); + if (!rh || !rh->reply_desc || !rh->on_reply) { + return LIPC_UNKNOWN_REQUEST; + } + lipc_status v = lipc_method_validate(rh->reply_desc, hdr, payload_len); + if (v != LIPC_OK) { + return v; + } + return rh->on_reply(user, hdr, payload, payload_len); +} + +static int lipc_client_inflight_has_tid(const lipc_client *client, uint32_t tid) +{ + for (const lipc_client_inflight *it = client->inflight; it; it = it->next) { + if (it->tid == tid) { + return 1; + } + } + return 0; +} + +static lipc_status lipc_client_next_tid_locked(lipc_client *client, uint32_t *out_tid) +{ + uint64_t attempts = 0; + uint32_t candidate = client->next_tid; + + while (attempts < 0xffffffffull) { + candidate++; + if (candidate == 0) { + candidate = 1; + } + if (!lipc_client_inflight_has_tid(client, candidate)) { + client->next_tid = candidate; + *out_tid = candidate; + return LIPC_OK; + } + attempts++; + } + return LIPC_INTERNAL_ERROR; +} + +static void lipc_client_inflight_free_list(lipc_client_inflight *head) +{ + while (head) { + lipc_client_inflight *next = head->next; + free(head); + head = next; + } +} + +static lipc_client_inflight *lipc_client_inflight_pop_tid(lipc_client *client, uint32_t tid) +{ + lipc_client_inflight *prev = NULL; + lipc_client_inflight *cur = client->inflight; + + while (cur) { + if (cur->tid == tid) { + if (prev) { + prev->next = cur->next; + } else { + client->inflight = cur->next; + } + cur->next = NULL; + return cur; + } + prev = cur; + cur = cur->next; + } + return NULL; +} + +lipc_client *lipc_client_create(uint32_t max_payload, const lipc_client_route_table *routes, + void *route_user) +{ + lipc_client *client = calloc(1, sizeof *client); + if (!client) { + return NULL; + } + + if (pthread_mutex_init(&client->lock, NULL) != 0) { + free(client); + return NULL; + } + if (pthread_mutex_init(&client->io_lock, NULL) != 0) { + pthread_mutex_destroy(&client->lock); + free(client); + return NULL; + } + if (pthread_rwlock_init(&client->routes_lock, NULL) != 0) { + pthread_mutex_destroy(&client->io_lock); + pthread_mutex_destroy(&client->lock); + free(client); + return NULL; + } + + client->max_payload = max_payload ? max_payload : LIPC_MAX_PAYLOAD_DEFAULT; + client->route_user = route_user; + client->next_tid = 0; + lipc_frame_reader_init(&client->reader); + + if (client->max_payload > 0) { + client->rx_payload = malloc(client->max_payload); + if (!client->rx_payload) { + pthread_rwlock_destroy(&client->routes_lock); + pthread_mutex_destroy(&client->io_lock); + pthread_mutex_destroy(&client->lock); + free(client); + return NULL; + } + } + + if (routes) { + if (routes->reply_handlers && routes->n_reply > 0) { + client->reply_handlers = calloc(routes->n_reply, sizeof *client->reply_handlers); + if (!client->reply_handlers) { + free(client->rx_payload); + pthread_rwlock_destroy(&client->routes_lock); + pthread_mutex_destroy(&client->io_lock); + pthread_mutex_destroy(&client->lock); + free(client); + return NULL; + } + memcpy(client->reply_handlers, routes->reply_handlers, + routes->n_reply * sizeof *routes->reply_handlers); + client->routes.reply_handlers = client->reply_handlers; + client->routes.n_reply = routes->n_reply; + } + if (routes->notify_handlers && routes->n_notify > 0) { + client->notify_handlers = calloc(routes->n_notify, sizeof *client->notify_handlers); + if (!client->notify_handlers) { + free(client->reply_handlers); + free(client->rx_payload); + pthread_rwlock_destroy(&client->routes_lock); + pthread_mutex_destroy(&client->io_lock); + pthread_mutex_destroy(&client->lock); + free(client); + return NULL; + } + memcpy(client->notify_handlers, routes->notify_handlers, + routes->n_notify * sizeof *routes->notify_handlers); + client->routes.notify_handlers = client->notify_handlers; + client->routes.n_notify = routes->n_notify; + } + } + + return client; +} + +void lipc_client_destroy(lipc_client *client) +{ + lipc_client_inflight *head; + + if (!client) { + return; + } + + pthread_mutex_lock(&client->lock); + head = client->inflight; + client->inflight = NULL; + pthread_mutex_unlock(&client->lock); + + lipc_client_inflight_free_list(head); + + pthread_rwlock_wrlock(&client->routes_lock); + free(client->reply_handlers); + free(client->notify_handlers); + client->reply_handlers = NULL; + client->notify_handlers = NULL; + memset(&client->routes, 0, sizeof client->routes); + client->route_user = NULL; + pthread_rwlock_unlock(&client->routes_lock); + + free(client->rx_payload); + pthread_rwlock_destroy(&client->routes_lock); + pthread_mutex_destroy(&client->io_lock); + pthread_mutex_destroy(&client->lock); + free(client); +} + +lipc_status lipc_client_next_tid(lipc_client *client, uint32_t *out_tid) +{ + lipc_status st; + + if (!client || !out_tid) { + return LIPC_BAD_HEADER; + } + + pthread_mutex_lock(&client->lock); + st = lipc_client_next_tid_locked(client, out_tid); + pthread_mutex_unlock(&client->lock); + return st; +} + +lipc_status lipc_client_request_async(lipc_client *client, int fd, uint16_t request_id, + uint16_t index, uint64_t value, const void *payload, size_t payload_len, + lipc_client_completion_cb on_complete, void *complete_user, uint32_t *out_tid) +{ + lipc_status st; + uint32_t tid = 0; + lipc_client_inflight *entry = NULL; + lipc_header req = { 0 }; + + if (!client || fd < 0 || (payload_len > 0 && !payload)) { + return LIPC_BAD_HEADER; + } + if ((uint32_t)payload_len != payload_len) { + return LIPC_BAD_LENGTH; + } + + entry = calloc(1, sizeof *entry); + if (!entry) { + return LIPC_INTERNAL_ERROR; + } + entry->request_id = request_id; + entry->on_complete = on_complete; + entry->complete_user = complete_user; + + pthread_mutex_lock(&client->lock); + st = lipc_client_next_tid_locked(client, &tid); + if (st == LIPC_OK) { + entry->tid = tid; + entry->next = client->inflight; + client->inflight = entry; + entry = NULL; + } + pthread_mutex_unlock(&client->lock); + + if (st != LIPC_OK) { + free(entry); + return st; + } + + req.request = request_id; + req.status = LIPC_OK; + req.index = index; + req.tid = tid; + req.length = (uint32_t)payload_len; + req.value = value; + + st = lipc_frame_write(fd, &req, payload, payload_len); + if (st != LIPC_OK) { + pthread_mutex_lock(&client->lock); + entry = lipc_client_inflight_pop_tid(client, tid); + pthread_mutex_unlock(&client->lock); + free(entry); + return st; + } + + if (out_tid) { + *out_tid = tid; + } + return LIPC_OK; +} + +lipc_status lipc_client_dispatch_frame(lipc_client *client, const lipc_header *hdr, + const uint8_t *payload, size_t payload_len) +{ + lipc_client_inflight *entry = NULL; + lipc_client_route_table routes = { 0 }; + void *route_user = NULL; + + if (!client || !hdr || (payload_len > 0 && !payload)) { + return LIPC_BAD_HEADER; + } + + if (hdr->tid == 0) { + pthread_rwlock_rdlock(&client->routes_lock); + routes = client->routes; + route_user = client->route_user; + pthread_rwlock_unlock(&client->routes_lock); + if (!routes.notify_handlers && !routes.reply_handlers) { + return LIPC_UNKNOWN_REQUEST; + } + return lipc_client_dispatch_incoming(&routes, route_user, hdr, payload, + payload_len); + } + + pthread_mutex_lock(&client->lock); + entry = lipc_client_inflight_pop_tid(client, hdr->tid); + pthread_mutex_unlock(&client->lock); + if (!entry) { + return LIPC_NOT_FOUND; + } + + if (entry->request_id != hdr->request) { + free(entry); + return LIPC_INVALID_PARAMS; + } + + if (entry->on_complete) { + lipc_status st = entry->on_complete(entry->complete_user, entry->tid, hdr, payload, + payload_len); + free(entry); + return st; + } + + pthread_rwlock_rdlock(&client->routes_lock); + routes = client->routes; + route_user = client->route_user; + pthread_rwlock_unlock(&client->routes_lock); + if (routes.reply_handlers || routes.notify_handlers) { + lipc_status st = lipc_client_dispatch_incoming(&routes, route_user, hdr, + payload, payload_len); + free(entry); + return st; + } + + free(entry); + return LIPC_OK; +} + +lipc_status lipc_client_poll_once(lipc_client *client, int fd) +{ + uint8_t scratch[4096]; + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_header hdr; + size_t payload_len = 0; + lipc_status st; + + if (!client || fd < 0 || (client->max_payload > 0 && !client->rx_payload)) { + return LIPC_BAD_HEADER; + } + + pthread_mutex_lock(&client->io_lock); + st = lipc_frame_reader_read(fd, &client->reader, client->max_payload, scratch, sizeof scratch, + wire, &hdr, client->rx_payload, client->max_payload, &payload_len); + pthread_mutex_unlock(&client->io_lock); + if (st != LIPC_OK) { + return st; + } + + return lipc_client_dispatch_frame(client, &hdr, client->rx_payload, payload_len); +} + +typedef struct lipc_sync_wait { + lipc_header *reply_hdr; + uint8_t *reply_payload; + size_t reply_cap; + size_t *reply_len; + int done; +} lipc_sync_wait; + +static lipc_status lipc_client_sync_complete(void *user, uint32_t tid, const lipc_header *hdr, + const uint8_t *payload, size_t payload_len) +{ + lipc_sync_wait *wait = user; + (void)tid; + + if (!wait || !hdr || (payload_len > 0 && !payload)) { + return LIPC_BAD_HEADER; + } + + if (payload_len > wait->reply_cap) { + return LIPC_BAD_LENGTH; + } + + if (wait->reply_hdr) { + *wait->reply_hdr = *hdr; + } + if (payload_len > 0 && wait->reply_payload) { + memcpy(wait->reply_payload, payload, payload_len); + } + if (wait->reply_len) { + *wait->reply_len = payload_len; + } + + wait->done = 1; + return LIPC_OK; +} + +lipc_status lipc_client_call(lipc_client *client, int fd, uint16_t request_id, uint16_t index, + uint64_t value, const void *payload, size_t payload_len, lipc_header *reply_hdr, + void *reply_payload, size_t reply_payload_cap, size_t *reply_len) +{ + lipc_sync_wait wait; + uint32_t tid = 0; + lipc_status st; + + if (!client || fd < 0 || (payload_len > 0 && !payload) + || (reply_payload_cap > 0 && !reply_payload)) { + return LIPC_BAD_HEADER; + } + + memset(&wait, 0, sizeof wait); + wait.reply_hdr = reply_hdr; + wait.reply_payload = reply_payload; + wait.reply_cap = reply_payload_cap; + wait.reply_len = reply_len; + + st = lipc_client_request_async(client, fd, request_id, index, value, payload, payload_len, + lipc_client_sync_complete, &wait, &tid); + if (st != LIPC_OK) { + return st; + } + + while (!wait.done) { + st = lipc_client_poll_once(client, fd); + if (st == LIPC_WOULD_BLOCK) { + struct pollfd pfd = { + .fd = fd, + .events = POLLIN, + }; + int prc; + do { + prc = poll(&pfd, 1, -1); + } while (prc < 0 && errno == EINTR); + if (prc < 0) { + return LIPC_IO_ERROR; + } + continue; + } + if (st != LIPC_OK) { + pthread_mutex_lock(&client->lock); + lipc_client_inflight *entry = lipc_client_inflight_pop_tid(client, tid); + pthread_mutex_unlock(&client->lock); + free(entry); + return st; + } + } + + return LIPC_OK; +} diff --git a/subprojects/localipc/lipc_dispatch_test.c b/subprojects/localipc/lipc_dispatch_test.c new file mode 100644 index 0000000..76508d8 --- /dev/null +++ b/subprojects/localipc/lipc_dispatch_test.c @@ -0,0 +1,839 @@ +/* + * SPDX-License-Identifier: GPL-2.0-only WITH Classpath-exception-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +static void test_method_validate_index_forbidden(void **state) +{ + (void)state; + const lipc_method_desc d = { + .index = LIPC_FIELD_FORBIDDEN, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_FORBIDDEN, + .payload_min_len = 0, + .payload_max_len = 0, + }; + lipc_header h = { .index = 1, .length = 0 }; + assert_int_equal(lipc_method_validate(&d, &h, 0), LIPC_INVALID_PARAMS); + h.index = 0; + assert_int_equal(lipc_method_validate(&d, &h, 0), LIPC_OK); +} + +static void test_method_validate_payload_bounds(void **state) +{ + (void)state; + const lipc_method_desc d = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_OPTIONAL, + .payload_min_len = 2, + .payload_max_len = 4, + }; + lipc_header h = { .length = 1 }; + assert_int_equal(lipc_method_validate(&d, &h, 1), LIPC_INVALID_PARAMS); + h.length = 2; + assert_int_equal(lipc_method_validate(&d, &h, 2), LIPC_OK); + h.length = 5; + assert_int_equal(lipc_method_validate(&d, &h, 5), LIPC_INVALID_PARAMS); +} + +static int g_server_cb_calls; + +static lipc_status server_echo_fn(void *user, lipc_server_reply_ctx *reply, const lipc_header *req_hdr, + const uint8_t *payload, size_t payload_len) +{ + (void)user; + (void)req_hdr; + g_server_cb_calls++; + return lipc_server_reply(reply, LIPC_OK, 0, 0, payload, payload_len); +} + +static void test_server_dispatch_reply_echoes_tid(void **state) +{ + (void)state; + int sp[2]; + + assert_int_equal(socketpair(AF_UNIX, SOCK_STREAM, 0, sp), 0); + + const lipc_method_desc desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_OPTIONAL, + .payload_min_len = 0, + .payload_max_len = 64, + }; + const lipc_server_handler handlers[] = { + { .request_id = 100, .request_desc = &desc, .fn = server_echo_fn }, + }; + + lipc_header req = { + .request = 100, + .status = 0, + .index = 0, + .tid = 0xdeadbeefu, + .length = 3, + .value = 0, + }; + const uint8_t pl_in[] = { 'a', 'b', 'c' }; + + g_server_cb_calls = 0; + assert_int_equal(lipc_frame_write(sp[1], &req, pl_in, sizeof pl_in), LIPC_OK); + + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_header rh; + uint8_t pl_out[16]; + size_t plen = 0; + + assert_int_equal( + lipc_frame_read(sp[0], LIPC_MAX_PAYLOAD_DEFAULT, wire, &rh, pl_out, sizeof pl_out, &plen), + LIPC_OK); + assert_int_equal((int)plen, 3); + + assert_int_equal(lipc_server_dispatch(handlers, 1, NULL, sp[0], &rh, pl_out, plen), LIPC_OK); + assert_int_equal(g_server_cb_calls, 1); + + assert_int_equal( + lipc_frame_read(sp[1], LIPC_MAX_PAYLOAD_DEFAULT, wire, &rh, pl_out, sizeof pl_out, &plen), + LIPC_OK); + assert_int_equal(rh.tid, 0xdeadbeefu); + assert_int_equal(rh.request, 100); + assert_int_equal(rh.status, LIPC_OK); + assert_int_equal((int)plen, 3); + assert_memory_equal(pl_out, pl_in, 3); + + close(sp[0]); + close(sp[1]); +} + +static void test_server_dispatch_rejects_tid_zero(void **state) +{ + (void)state; + const lipc_method_desc desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_FORBIDDEN, + .payload_min_len = 0, + .payload_max_len = 0, + }; + const lipc_server_handler handlers[] = { + { .request_id = 1, .request_desc = &desc, .fn = server_echo_fn }, + }; + lipc_header h = { .request = 1, .tid = 0, .length = 0 }; + + g_server_cb_calls = 0; + assert_int_equal(lipc_server_dispatch(handlers, 1, NULL, -1, &h, NULL, 0), LIPC_INVALID_PARAMS); + assert_int_equal(g_server_cb_calls, 0); +} + +static int g_notify_calls; +static int g_reply_calls; + +static lipc_status notify_fn(void *user, const lipc_header *hdr, const uint8_t *payload, + size_t payload_len) +{ + (void)user; + (void)payload; + (void)payload_len; + assert_int_equal(hdr->tid, 0u); + g_notify_calls++; + return LIPC_OK; +} + +static lipc_status reply_fn(void *user, const lipc_header *hdr, const uint8_t *payload, + size_t payload_len) +{ + (void)user; + (void)payload; + (void)payload_len; + assert_int_equal(hdr->tid, 42u); + g_reply_calls++; + return LIPC_OK; +} + +static void test_client_dispatch_notify_vs_reply(void **state) +{ + (void)state; + const lipc_method_desc nd = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_FORBIDDEN, + .payload_min_len = 0, + .payload_max_len = 0, + }; + const lipc_method_desc rd = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_FORBIDDEN, + .payload_min_len = 0, + .payload_max_len = 0, + }; + const lipc_client_notify_handler nh[] = { + { .request_id = 7, .notify_desc = &nd, .on_notify = notify_fn }, + }; + const lipc_client_reply_handler rh[] = { + { .request_id = 8, .reply_desc = &rd, .on_reply = reply_fn }, + }; + const lipc_client_route_table tab = { + .reply_handlers = rh, + .n_reply = 1, + .notify_handlers = nh, + .n_notify = 1, + }; + + g_notify_calls = 0; + g_reply_calls = 0; + + lipc_header nhdr = { .request = 7, .tid = 0, .length = 0 }; + assert_int_equal(lipc_client_dispatch_incoming(&tab, NULL, &nhdr, NULL, 0), LIPC_OK); + assert_int_equal(g_notify_calls, 1); + assert_int_equal(g_reply_calls, 0); + + lipc_header rhdr = { .request = 8, .tid = 42, .length = 0 }; + assert_int_equal(lipc_client_dispatch_incoming(&tab, NULL, &rhdr, NULL, 0), LIPC_OK); + assert_int_equal(g_notify_calls, 1); + assert_int_equal(g_reply_calls, 1); + + lipc_header unk = { .request = 999, .tid = 0, .length = 0 }; + assert_int_equal(lipc_client_dispatch_incoming(&tab, NULL, &unk, NULL, 0), LIPC_UNKNOWN_REQUEST); +} + +static void test_handler_lookup(void **state) +{ + (void)state; + const lipc_method_desc d = { 0 }; + const lipc_server_handler handlers[] = { + { .request_id = 10, .request_desc = &d, .fn = NULL }, + { .request_id = 20, .request_desc = &d, .fn = NULL }, + }; + assert_non_null(lipc_server_handler_lookup(handlers, 2, 10)); + assert_non_null(lipc_server_handler_lookup(handlers, 2, 20)); + assert_null(lipc_server_handler_lookup(handlers, 2, 99)); + assert_int_equal(lipc_server_handler_lookup(handlers, 2, 10)->request_id, 10); +} + +static void test_client_next_tid_nonzero_unique(void **state) +{ + (void)state; + lipc_client *client = lipc_client_create(0, NULL, NULL); + uint32_t tid1 = 0; + uint32_t tid2 = 0; + + assert_non_null(client); + assert_int_equal(lipc_client_next_tid(client, &tid1), LIPC_OK); + assert_int_equal(lipc_client_next_tid(client, &tid2), LIPC_OK); + assert_true(tid1 != 0); + assert_true(tid2 != 0); + assert_true(tid1 != tid2); + lipc_client_destroy(client); +} + +typedef struct tid_thread_args { + lipc_client *client; + uint32_t *tids; + size_t count; + int status; +} tid_thread_args; + +static void *tid_alloc_thread(void *arg) +{ + tid_thread_args *a = arg; + + for (size_t i = 0; i < a->count; i++) { + if (lipc_client_next_tid(a->client, &a->tids[i]) != LIPC_OK || a->tids[i] == 0) { + a->status = -1; + return NULL; + } + } + a->status = 0; + return NULL; +} + +static int uint32_cmp(const void *lhs, const void *rhs) +{ + const uint32_t a = *(const uint32_t *)lhs; + const uint32_t b = *(const uint32_t *)rhs; + if (a < b) { + return -1; + } + if (a > b) { + return 1; + } + return 0; +} + +static void test_client_next_tid_thread_safe(void **state) +{ + (void)state; + enum { + THREADS = 4, + PER_THREAD = 128, + TOTAL = THREADS * PER_THREAD, + }; + lipc_client *client = lipc_client_create(0, NULL, NULL); + pthread_t threads[THREADS]; + tid_thread_args args[THREADS]; + uint32_t tids[TOTAL]; + + assert_non_null(client); + for (size_t i = 0; i < THREADS; i++) { + args[i].client = client; + args[i].tids = &tids[i * PER_THREAD]; + args[i].count = PER_THREAD; + args[i].status = -1; + assert_int_equal(pthread_create(&threads[i], NULL, tid_alloc_thread, &args[i]), 0); + } + for (size_t i = 0; i < THREADS; i++) { + assert_int_equal(pthread_join(threads[i], NULL), 0); + assert_int_equal(args[i].status, 0); + } + + qsort(tids, TOTAL, sizeof tids[0], uint32_cmp); + for (size_t i = 1; i < TOTAL; i++) { + assert_true(tids[i - 1] != tids[i]); + } + + lipc_client_destroy(client); +} + +static int g_complete_calls; +static uint32_t g_complete_tid; + +static lipc_status completion_fn(void *user, uint32_t tid, const lipc_header *hdr, + const uint8_t *payload, size_t payload_len) +{ + (void)user; + assert_non_null(hdr); + assert_non_null(payload); + assert_int_equal(payload_len, 2); + assert_int_equal(hdr->status, LIPC_OK); + assert_int_equal(hdr->tid, tid); + assert_memory_equal(payload, "ok", 2); + g_complete_tid = tid; + g_complete_calls++; + return LIPC_OK; +} + +static void test_client_request_async_and_poll(void **state) +{ + (void)state; + int sp[2]; + lipc_client *client; + uint32_t tid = 0; + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_header req; + uint8_t req_payload[8]; + size_t req_len = 0; + + assert_int_equal(socketpair(AF_UNIX, SOCK_STREAM, 0, sp), 0); + client = lipc_client_create(0, NULL, NULL); + assert_non_null(client); + + g_complete_calls = 0; + g_complete_tid = 0; + assert_int_equal(lipc_client_request_async(client, sp[0], 77, 3, 9, "hi", 2, completion_fn, + NULL, &tid), + LIPC_OK); + assert_true(tid != 0); + + assert_int_equal( + lipc_frame_read(sp[1], LIPC_MAX_PAYLOAD_DEFAULT, wire, &req, req_payload, sizeof req_payload, + &req_len), + LIPC_OK); + assert_int_equal(req.request, 77); + assert_int_equal(req.tid, tid); + assert_int_equal(req_len, 2); + + lipc_header reply = { + .request = req.request, + .status = LIPC_OK, + .index = req.index, + .tid = req.tid, + .length = 2, + .value = 0, + }; + assert_int_equal(lipc_frame_write(sp[1], &reply, "ok", 2), LIPC_OK); + assert_int_equal(lipc_client_poll_once(client, sp[0]), LIPC_OK); + assert_int_equal(g_complete_calls, 1); + assert_int_equal(g_complete_tid, tid); + + lipc_client_destroy(client); + close(sp[0]); + close(sp[1]); +} + +typedef struct sync_server_args { + int fd; +} sync_server_args; + +static void *sync_server_thread(void *arg) +{ + sync_server_args *a = arg; + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_header req; + uint8_t req_payload[16]; + size_t req_len = 0; + + if (lipc_frame_read(a->fd, LIPC_MAX_PAYLOAD_DEFAULT, wire, &req, req_payload, sizeof req_payload, + &req_len) + != LIPC_OK) { + return NULL; + } + + lipc_header notify = { + .request = 500, + .status = LIPC_OK, + .index = 0, + .tid = 0, + .length = 0, + .value = 0, + }; + (void)lipc_frame_write(a->fd, ¬ify, NULL, 0); + + lipc_header reply = { + .request = req.request, + .status = LIPC_OK, + .index = req.index, + .tid = req.tid, + .length = 4, + .value = 0x55ull, + }; + (void)lipc_frame_write(a->fd, &reply, "pong", 4); + return NULL; +} + +static int g_notify_seen; + +static lipc_status sync_notify_fn(void *user, const lipc_header *hdr, const uint8_t *payload, + size_t payload_len) +{ + (void)user; + (void)payload; + (void)payload_len; + assert_non_null(hdr); + assert_int_equal(hdr->tid, 0u); + assert_int_equal(hdr->request, 500u); + g_notify_seen++; + return LIPC_OK; +} + +static void test_client_call_sync_with_notify_dispatch(void **state) +{ + (void)state; + int sp[2]; + pthread_t thr; + sync_server_args args; + const lipc_method_desc notify_desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_FORBIDDEN, + .payload_min_len = 0, + .payload_max_len = 0, + }; + const lipc_client_notify_handler nh[] = { + { .request_id = 500, .notify_desc = ¬ify_desc, .on_notify = sync_notify_fn }, + }; + const lipc_client_route_table routes = { + .reply_handlers = NULL, + .n_reply = 0, + .notify_handlers = nh, + .n_notify = 1, + }; + lipc_client *client; + lipc_header reply_hdr; + uint8_t reply_payload[16]; + size_t reply_len = 0; + + assert_int_equal(socketpair(AF_UNIX, SOCK_STREAM, 0, sp), 0); + client = lipc_client_create(0, &routes, NULL); + assert_non_null(client); + + g_notify_seen = 0; + args.fd = sp[1]; + assert_int_equal(pthread_create(&thr, NULL, sync_server_thread, &args), 0); + + assert_int_equal(lipc_client_call(client, sp[0], 501, 1, 0, "ping", 4, &reply_hdr, reply_payload, + sizeof reply_payload, &reply_len), + LIPC_OK); + assert_int_equal((int)reply_len, 4); + assert_memory_equal(reply_payload, "pong", 4); + assert_int_equal(reply_hdr.request, 501); + assert_int_equal(reply_hdr.tid == 0, 0); + assert_int_equal(g_notify_seen, 1); + + assert_int_equal(pthread_join(thr, NULL), 0); + lipc_client_destroy(client); + close(sp[0]); + close(sp[1]); +} + +typedef struct server_thread_args { + lipc_server *server; + lipc_status run_status; +} server_thread_args; + +static void *server_run_thread(void *arg) +{ + server_thread_args *a = arg; + + a->run_status = lipc_server_run(a->server); + return NULL; +} + +static int make_listener_socket(char *path, size_t path_cap) +{ + if (!path || path_cap == 0) { + return -1; + } + if (snprintf(path, path_cap, "/tmp/lipc_dispatch_%ld_%ld.sock", (long)getpid(), random()) + >= (int)path_cap) { + return -1; + } + return lipc_socket_listen(path, 8, LIPC_SOCKET_NONBLOCK); +} + +static int connect_client_socket(const char *path) +{ + return lipc_socket_connect(path, 0u); +} + +static int write_full_buf(int fd, const uint8_t *buf, size_t len) +{ + size_t off = 0; + + while (off < len) { + ssize_t n = write(fd, buf + off, len - off); + if (n < 0) { + if (errno == EINTR) { + continue; + } + return -1; + } + if (n == 0) { + return -1; + } + off += (size_t)n; + } + return 0; +} + +static void test_server_run_accept_dispatch_stop(void **state) +{ + (void)state; + char sock_path[108]; + int listen_fd; + int client_fd = -1; + pthread_t thr; + server_thread_args args; + lipc_server *server; + const lipc_method_desc desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_OPTIONAL, + .payload_min_len = 0, + .payload_max_len = 64, + }; + const lipc_server_handler handlers[] = { + { .request_id = 100, .request_desc = &desc, .fn = server_echo_fn }, + }; + lipc_header req = { + .request = 100, + .status = LIPC_OK, + .index = 1, + .tid = 55, + .length = 5, + .value = 7, + }; + uint8_t wire[LIPC_HEADER_SIZE]; + uint8_t out_payload[16]; + lipc_header out_hdr; + size_t out_len = 0; + + listen_fd = make_listener_socket(sock_path, sizeof sock_path); + assert_true(listen_fd >= 0); + + server = lipc_server_create(listen_fd, 0, handlers, 1, NULL); + assert_non_null(server); + + args.server = server; + args.run_status = LIPC_INTERNAL_ERROR; + assert_int_equal(pthread_create(&thr, NULL, server_run_thread, &args), 0); + + client_fd = connect_client_socket(sock_path); + assert_true(client_fd >= 0); + + g_server_cb_calls = 0; + assert_int_equal(lipc_frame_write(client_fd, &req, "hello", 5), LIPC_OK); + assert_int_equal( + lipc_frame_read(client_fd, LIPC_MAX_PAYLOAD_DEFAULT, wire, &out_hdr, out_payload, + sizeof out_payload, &out_len), + LIPC_OK); + assert_int_equal(g_server_cb_calls, 1); + assert_int_equal(out_hdr.request, 100); + assert_int_equal(out_hdr.tid, 55); + assert_int_equal((int)out_len, 5); + assert_memory_equal(out_payload, "hello", 5); + + assert_int_equal(lipc_server_stop(server), LIPC_OK); + assert_int_equal(pthread_join(thr, NULL), 0); + assert_int_equal(args.run_status, LIPC_OK); + + close(client_fd); + lipc_server_destroy(server); + unlink(sock_path); +} + +static void test_server_run_stream_frames_and_close(void **state) +{ + (void)state; + char sock_path[108]; + int listen_fd; + int client_fd = -1; + pthread_t thr; + server_thread_args args; + lipc_server *server; + const lipc_method_desc desc = { + .index = LIPC_FIELD_OPTIONAL, + .value = LIPC_FIELD_OPTIONAL, + .payload = LIPC_FIELD_OPTIONAL, + .payload_min_len = 0, + .payload_max_len = 64, + }; + const lipc_server_handler handlers[] = { + { .request_id = 100, .request_desc = &desc, .fn = server_echo_fn }, + }; + lipc_header req1 = { + .request = 100, + .status = LIPC_OK, + .index = 0, + .tid = 101, + .length = 3, + .value = 0, + }; + lipc_header req2 = { + .request = 100, + .status = LIPC_OK, + .index = 0, + .tid = 102, + .length = 0, + .value = 0, + }; + uint8_t frame1[LIPC_HEADER_SIZE]; + uint8_t frame2[LIPC_HEADER_SIZE]; + uint8_t combo[LIPC_HEADER_SIZE + 3 + LIPC_HEADER_SIZE]; + uint8_t wire[LIPC_HEADER_SIZE]; + uint8_t out_payload[16]; + lipc_header out_hdr; + size_t out_len = 0; + + listen_fd = make_listener_socket(sock_path, sizeof sock_path); + assert_true(listen_fd >= 0); + server = lipc_server_create(listen_fd, 0, handlers, 1, NULL); + assert_non_null(server); + + args.server = server; + args.run_status = LIPC_INTERNAL_ERROR; + assert_int_equal(pthread_create(&thr, NULL, server_run_thread, &args), 0); + + client_fd = connect_client_socket(sock_path); + assert_true(client_fd >= 0); + + lipc_frame_fill_checksum(frame1, &req1, "one", 3); + lipc_frame_fill_checksum(frame2, &req2, NULL, 0); + memcpy(combo, frame1, LIPC_HEADER_SIZE); + memcpy(combo + LIPC_HEADER_SIZE, "one", 3); + memcpy(combo + LIPC_HEADER_SIZE + 3, frame2, LIPC_HEADER_SIZE); + + g_server_cb_calls = 0; + assert_int_equal(write_full_buf(client_fd, combo, sizeof combo), 0); + + assert_int_equal( + lipc_frame_read(client_fd, LIPC_MAX_PAYLOAD_DEFAULT, wire, &out_hdr, out_payload, + sizeof out_payload, &out_len), + LIPC_OK); + assert_int_equal(out_hdr.tid, 101u); + assert_int_equal((int)out_len, 3); + assert_memory_equal(out_payload, "one", 3); + + assert_int_equal( + lipc_frame_read(client_fd, LIPC_MAX_PAYLOAD_DEFAULT, wire, &out_hdr, out_payload, + sizeof out_payload, &out_len), + LIPC_OK); + assert_int_equal(out_hdr.tid, 102u); + assert_int_equal((int)out_len, 0); + assert_int_equal(g_server_cb_calls, 2); + + close(client_fd); + assert_int_equal(lipc_server_stop(server), LIPC_OK); + assert_int_equal(pthread_join(thr, NULL), 0); + assert_int_equal(args.run_status, LIPC_OK); + + lipc_server_destroy(server); + unlink(sock_path); +} + +typedef struct concurrent_rpc_args { + int iterations; + int status; +} concurrent_rpc_args; + +typedef struct concurrent_rpc_server_args { + int fd; + int iterations; + int status; +} concurrent_rpc_server_args; + +static void *concurrent_rpc_server_thread(void *arg) +{ + concurrent_rpc_server_args *a = arg; + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_header req; + uint8_t req_payload[16]; + size_t req_len = 0; + + a->status = 0; + for (int i = 0; i < a->iterations; i++) { + lipc_header reply = { 0 }; + if (lipc_frame_read(a->fd, LIPC_MAX_PAYLOAD_DEFAULT, wire, &req, req_payload, sizeof req_payload, + &req_len) + != LIPC_OK) { + a->status = -1; + break; + } + + reply.request = req.request; + reply.status = LIPC_OK; + reply.index = req.index; + reply.tid = req.tid; + reply.length = (uint32_t)req_len; + reply.value = req.value; + if (lipc_frame_write(a->fd, &reply, req_payload, req_len) != LIPC_OK) { + a->status = -1; + break; + } + } + + close(a->fd); + return NULL; +} + +static void *concurrent_rpc_client_thread(void *arg) +{ + concurrent_rpc_args *a = arg; + int sp[2]; + pthread_t server_thr; + concurrent_rpc_server_args server_args; + lipc_client *client = NULL; + uint8_t reply_payload[16]; + size_t reply_len = 0; + lipc_header reply_hdr; + + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sp) != 0) { + a->status = -1; + return NULL; + } + + server_args.fd = sp[1]; + server_args.iterations = a->iterations; + server_args.status = -1; + if (pthread_create(&server_thr, NULL, concurrent_rpc_server_thread, &server_args) != 0) { + close(sp[0]); + close(sp[1]); + a->status = -1; + return NULL; + } + + client = lipc_client_create(0, NULL, NULL); + if (!client) { + close(sp[0]); + pthread_join(server_thr, NULL); + a->status = -1; + return NULL; + } + + a->status = 0; + for (int i = 0; i < a->iterations; i++) { + const uint16_t request_id = (uint16_t)(700 + (i % 3)); + const uint16_t index = (uint16_t)(i + 1); + const uint64_t value = (uint64_t)i * 3u; + + reply_len = 0; + if (lipc_client_call(client, sp[0], request_id, index, value, "xy", 2, &reply_hdr, reply_payload, + sizeof reply_payload, &reply_len) + != LIPC_OK) { + a->status = -1; + break; + } + if (reply_hdr.request != request_id || reply_hdr.tid == 0 || reply_hdr.index != index + || reply_hdr.value != value || reply_len != 2 || memcmp(reply_payload, "xy", 2) != 0) { + a->status = -1; + break; + } + } + + close(sp[0]); + lipc_client_destroy(client); + pthread_join(server_thr, NULL); + if (server_args.status != 0) { + a->status = -1; + } + return NULL; +} + +static void test_client_call_concurrent_stress(void **state) +{ + (void)state; + enum { + THREADS = 4, + ITERS_PER_THREAD = 64, + }; + pthread_t threads[THREADS]; + concurrent_rpc_args args[THREADS]; + + for (size_t i = 0; i < THREADS; i++) { + args[i].iterations = ITERS_PER_THREAD; + args[i].status = -1; + assert_int_equal(pthread_create(&threads[i], NULL, concurrent_rpc_client_thread, &args[i]), 0); + } + + for (size_t i = 0; i < THREADS; i++) { + assert_int_equal(pthread_join(threads[i], NULL), 0); + assert_int_equal(args[i].status, 0); + } +} + +int main(void) +{ + const struct CMUnitTest tests[] = { + cmocka_unit_test(test_method_validate_index_forbidden), + cmocka_unit_test(test_method_validate_payload_bounds), + cmocka_unit_test(test_server_dispatch_reply_echoes_tid), + cmocka_unit_test(test_server_dispatch_rejects_tid_zero), + cmocka_unit_test(test_client_dispatch_notify_vs_reply), + cmocka_unit_test(test_handler_lookup), + cmocka_unit_test(test_client_next_tid_nonzero_unique), + cmocka_unit_test(test_client_next_tid_thread_safe), + cmocka_unit_test(test_client_request_async_and_poll), + cmocka_unit_test(test_client_call_sync_with_notify_dispatch), + cmocka_unit_test(test_client_call_concurrent_stress), + cmocka_unit_test(test_server_run_accept_dispatch_stop), + cmocka_unit_test(test_server_run_stream_frames_and_close), + }; + cmocka_set_message_output(CM_OUTPUT_TAP); + return cmocka_run_group_tests(tests, NULL, NULL); +} diff --git a/subprojects/localipc/lipc_packet.c b/subprojects/localipc/lipc_packet.c new file mode 100644 index 0000000..73c4e62 --- /dev/null +++ b/subprojects/localipc/lipc_packet.c @@ -0,0 +1,545 @@ +/* + * LIPC wire header, zlib CRC32 framing, stream-safe reads. + * + * SPDX-License-Identifier: GPL-2.0-only WITH Classpath-exception-2.0 + */ + +#include + +#include +#include +#include + +uint32_t lipc_crc32_update(uint32_t crc, const void *data, size_t len); + +static void lipc_put16(uint8_t *p, uint16_t v) +{ + p[0] = (uint8_t)(v >> 8); + p[1] = (uint8_t)v; +} + +static void lipc_put32(uint8_t *p, uint32_t v) +{ + p[0] = (uint8_t)(v >> 24); + p[1] = (uint8_t)(v >> 16); + p[2] = (uint8_t)(v >> 8); + p[3] = (uint8_t)v; +} + +static void lipc_put64(uint8_t *p, uint64_t v) +{ + p[0] = (uint8_t)(v >> 56); + p[1] = (uint8_t)(v >> 48); + p[2] = (uint8_t)(v >> 40); + p[3] = (uint8_t)(v >> 32); + p[4] = (uint8_t)(v >> 24); + p[5] = (uint8_t)(v >> 16); + p[6] = (uint8_t)(v >> 8); + p[7] = (uint8_t)v; +} + +static uint16_t lipc_get16(const uint8_t *p) +{ + return (uint16_t)(((uint16_t)p[0] << 8) | p[1]); +} + +static uint32_t lipc_get32(const uint8_t *p) +{ + return ((uint32_t)p[0] << 24) | ((uint32_t)p[1] << 16) | ((uint32_t)p[2] << 8) | (uint32_t)p[3]; +} + +static uint64_t lipc_get64(const uint8_t *p) +{ + return ((uint64_t)p[0] << 56) | ((uint64_t)p[1] << 48) | ((uint64_t)p[2] << 40) + | ((uint64_t)p[3] << 32) | ((uint64_t)p[4] << 24) | ((uint64_t)p[5] << 16) + | ((uint64_t)p[6] << 8) | (uint64_t)p[7]; +} + +int lipc_status_is_ok(lipc_status s) +{ + return s == LIPC_OK; +} + +const char *lipc_status_str(lipc_status s) +{ + switch (s) { + case LIPC_OK: + return "OK"; + case LIPC_BAD_HEADER: + return "BAD_HEADER"; + case LIPC_BAD_MAGIC: + return "BAD_MAGIC"; + case LIPC_BAD_VERSION: + return "BAD_VERSION"; + case LIPC_BAD_CHECKSUM: + return "BAD_CHECKSUM"; + case LIPC_BAD_LENGTH: + return "BAD_LENGTH"; + case LIPC_UNKNOWN_REQUEST: + return "UNKNOWN_REQUEST"; + case LIPC_INVALID_PARAMS: + return "INVALID_PARAMS"; + case LIPC_NOT_FOUND: + return "NOT_FOUND"; + case LIPC_INTERNAL_ERROR: + return "INTERNAL_ERROR"; + case LIPC_WOULD_BLOCK: + return "WOULD_BLOCK"; + case LIPC_IO_ERROR: + return "IO_ERROR"; + case LIPC_INCOMPLETE: + return "INCOMPLETE"; + default: + if ((uint16_t)s >= LIPC_STATUS_APP_BASE) { + return "APPLICATION_DEFINED"; + } + return "UNKNOWN_LIPC_STATUS"; + } +} + +void lipc_header_encode(uint8_t wire[LIPC_HEADER_SIZE], const lipc_header *h) +{ + if (!wire || !h) { + return; + } + lipc_put32(wire + LIPC_OFF_MAGIC, LIPC_PROTOCOL_MAGIC); + lipc_put16(wire + LIPC_OFF_VERSION, LIPC_PROTOCOL_VERSION); + lipc_put16(wire + LIPC_OFF_REQUEST, h->request); + lipc_put16(wire + LIPC_OFF_STATUS, h->status); + lipc_put16(wire + LIPC_OFF_INDEX, h->index); + lipc_put32(wire + LIPC_OFF_TID, h->tid); + lipc_put32(wire + LIPC_OFF_LENGTH, h->length); + lipc_put32(wire + LIPC_OFF_CHECKSUM, 0); + lipc_put64(wire + LIPC_OFF_VALUE, h->value); +} + +lipc_status lipc_header_decode(lipc_header *h, const uint8_t wire[LIPC_HEADER_SIZE], + uint32_t max_payload) +{ + if (!h || !wire) { + return LIPC_BAD_HEADER; + } + + h->magic = lipc_get32(wire + LIPC_OFF_MAGIC); + h->version = lipc_get16(wire + LIPC_OFF_VERSION); + h->request = lipc_get16(wire + LIPC_OFF_REQUEST); + h->status = lipc_get16(wire + LIPC_OFF_STATUS); + h->index = lipc_get16(wire + LIPC_OFF_INDEX); + h->tid = lipc_get32(wire + LIPC_OFF_TID); + h->length = lipc_get32(wire + LIPC_OFF_LENGTH); + h->checksum = lipc_get32(wire + LIPC_OFF_CHECKSUM); + h->value = lipc_get64(wire + LIPC_OFF_VALUE); + + if (h->magic != LIPC_PROTOCOL_MAGIC) { + return LIPC_BAD_MAGIC; + } + if (h->version != LIPC_PROTOCOL_VERSION) { + return LIPC_BAD_VERSION; + } + if (h->length > max_payload) { + return LIPC_BAD_LENGTH; + } + return LIPC_OK; +} + +void lipc_header_set_checksum(uint8_t wire[LIPC_HEADER_SIZE], uint32_t crc) +{ + lipc_put32(wire + LIPC_OFF_CHECKSUM, crc); +} + +uint32_t lipc_checksum_compute(const uint8_t wire[LIPC_HEADER_SIZE], const void *payload, + size_t payload_len) +{ + uint32_t c = lipc_crc32_update(0, wire, LIPC_HEADER_SIZE); + if (payload_len > 0 && payload) { + c = lipc_crc32_update(c, payload, payload_len); + } + return c; +} + +lipc_status lipc_checksum_verify(const uint8_t wire[LIPC_HEADER_SIZE], const void *payload, + size_t payload_len) +{ + uint32_t got = lipc_get32(wire + LIPC_OFF_CHECKSUM); + uint8_t tmp[LIPC_HEADER_SIZE]; + + memcpy(tmp, wire, sizeof tmp); + lipc_put32(tmp + LIPC_OFF_CHECKSUM, 0); + uint32_t expect = lipc_checksum_compute(tmp, payload, payload_len); + if (expect != got) { + return LIPC_BAD_CHECKSUM; + } + return LIPC_OK; +} + +void lipc_frame_fill_checksum(uint8_t wire[LIPC_HEADER_SIZE], const lipc_header *h, + const void *payload, size_t payload_len) +{ + if (!wire || !h) { + return; + } + lipc_header_encode(wire, h); + uint32_t crc = lipc_checksum_compute(wire, payload, payload_len); + lipc_header_set_checksum(wire, crc); +} + +static lipc_status lipc_rx_feed_exhaust(lipc_stream_rx *rx, uint32_t max_payload, + const uint8_t *data, size_t data_len, size_t *data_off, + lipc_header *out_hdr, uint8_t *payload_buf, size_t payload_cap, size_t *payload_len, + int *frame_done) +{ + while (*data_off < data_len) { + lipc_status st = lipc_stream_rx_feed(rx, max_payload, data, data_len, data_off, out_hdr, + payload_buf, payload_cap, payload_len, frame_done); + if (st != LIPC_OK && st != LIPC_INCOMPLETE) { + return st; + } + if (*frame_done) { + return LIPC_OK; + } + if (st == LIPC_INCOMPLETE && *data_off == data_len) { + return LIPC_INCOMPLETE; + } + } + return LIPC_INCOMPLETE; +} + +void lipc_frame_reader_init(lipc_frame_reader *r) +{ + if (!r) { + return; + } + lipc_stream_rx_init(&r->stream); + r->pending_len = 0; +} + +lipc_status lipc_frame_reader_read(int fd, lipc_frame_reader *r, uint32_t max_payload, + uint8_t *scratch, size_t scratch_len, + uint8_t *wire, lipc_header *h, + void *payload_buf, size_t payload_cap, size_t *payload_len) +{ + if (!r || !scratch || scratch_len == 0 || scratch_len > LIPC_FRAME_READER_PENDING_CAP || !h + || !payload_len || (payload_cap > 0 && !payload_buf)) { + return LIPC_BAD_HEADER; + } + + for (;;) { + while (r->pending_len > 0) { + size_t off = 0; + int done = 0; + lipc_status st = lipc_rx_feed_exhaust(&r->stream, max_payload, r->pending, + r->pending_len, &off, h, (uint8_t *)payload_buf, payload_cap, payload_len, &done); + if (st != LIPC_OK && st != LIPC_INCOMPLETE) { + return st; + } + if (st == LIPC_OK && done) { + size_t left = r->pending_len - off; + if (left > 0) { + memmove(r->pending, r->pending + off, left); + } + r->pending_len = left; + if (wire) { + lipc_header_encode(wire, h); + lipc_header_set_checksum(wire, h->checksum); + } + return LIPC_OK; + } + r->pending_len = 0; + break; + } + + ssize_t n = read(fd, scratch, scratch_len); + if (n < 0) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return LIPC_WOULD_BLOCK; + } + return LIPC_IO_ERROR; + } + if (n == 0) { + errno = ECONNRESET; + return LIPC_IO_ERROR; + } + + size_t off = 0; + int done = 0; + lipc_status st = lipc_rx_feed_exhaust(&r->stream, max_payload, scratch, (size_t)n, &off, + h, (uint8_t *)payload_buf, payload_cap, payload_len, &done); + if (st != LIPC_OK && st != LIPC_INCOMPLETE) { + return st; + } + if (st == LIPC_OK && done) { + size_t left = (size_t)n - off; + if (left > LIPC_FRAME_READER_PENDING_CAP) { + return LIPC_BAD_LENGTH; + } + if (left > 0) { + memcpy(r->pending, scratch + off, left); + r->pending_len = left; + } + if (wire) { + lipc_header_encode(wire, h); + lipc_header_set_checksum(wire, h->checksum); + } + return LIPC_OK; + } + } +} + +lipc_status lipc_frame_writer_begin(lipc_frame_writer *w, const lipc_header *h, + const void *payload, size_t payload_len) +{ + if (!w || !h) { + return LIPC_BAD_HEADER; + } + if (h->length != payload_len) { + return LIPC_BAD_LENGTH; + } + if (payload_len > 0 && !payload) { + return LIPC_BAD_HEADER; + } + lipc_frame_fill_checksum(w->hdr, h, payload, payload_len); + w->payload = (const uint8_t *)payload; + w->payload_len = payload_len; + w->sent = 0; + w->total = LIPC_HEADER_SIZE + payload_len; + return LIPC_OK; +} + +lipc_status lipc_frame_writer_write(int fd, lipc_frame_writer *w) +{ + if (!w) { + return LIPC_BAD_HEADER; + } + while (w->sent < w->total) { + const uint8_t *base; + size_t chunk; + + if (w->sent < LIPC_HEADER_SIZE) { + base = w->hdr + w->sent; + chunk = LIPC_HEADER_SIZE - w->sent; + } else { + size_t poff = w->sent - LIPC_HEADER_SIZE; + + base = w->payload + poff; + chunk = w->payload_len - poff; + } + ssize_t n = write(fd, base, chunk); + if (n < 0) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return LIPC_WOULD_BLOCK; + } + return LIPC_IO_ERROR; + } + if (n == 0) { + errno = EPIPE; + return LIPC_IO_ERROR; + } + w->sent += (size_t)n; + } + return LIPC_OK; +} + +enum { LIPC_READ_FULL_OK = 0, LIPC_READ_FULL_ERR = -1, LIPC_READ_FULL_AGAIN = -2 }; + +static int lipc_read_full(int fd, void *buf, size_t len) +{ + uint8_t *p = buf; + size_t got = 0; + + while (got < len) { + ssize_t n = read(fd, p + got, len - got); + if (n < 0) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return LIPC_READ_FULL_AGAIN; + } + return LIPC_READ_FULL_ERR; + } + if (n == 0) { + errno = ECONNRESET; + return LIPC_READ_FULL_ERR; + } + got += (size_t)n; + } + return LIPC_READ_FULL_OK; +} + +static int lipc_write_full(int fd, const void *buf, size_t len) +{ + const uint8_t *p = buf; + size_t put = 0; + + while (put < len) { + ssize_t n = write(fd, p + put, len - put); + if (n < 0) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return LIPC_READ_FULL_AGAIN; + } + return LIPC_READ_FULL_ERR; + } + put += (size_t)n; + } + return LIPC_READ_FULL_OK; +} + +lipc_status lipc_frame_read(int fd, uint32_t max_payload, uint8_t wire[LIPC_HEADER_SIZE], + lipc_header *h, void *payload_buf, size_t payload_cap, size_t *payload_len) +{ + if (!wire || !h || !payload_len || (payload_cap > 0 && !payload_buf)) { + return LIPC_BAD_HEADER; + } + + int rr = lipc_read_full(fd, wire, LIPC_HEADER_SIZE); + if (rr == LIPC_READ_FULL_AGAIN) { + return LIPC_WOULD_BLOCK; + } + if (rr != LIPC_READ_FULL_OK) { + return LIPC_IO_ERROR; + } + + lipc_status st = lipc_header_decode(h, wire, max_payload); + if (st != LIPC_OK) { + return st; + } + if (h->length > payload_cap) { + return LIPC_BAD_LENGTH; + } + + if (h->length > 0) { + rr = lipc_read_full(fd, payload_buf, h->length); + if (rr == LIPC_READ_FULL_AGAIN) { + return LIPC_WOULD_BLOCK; + } + if (rr != LIPC_READ_FULL_OK) { + return LIPC_IO_ERROR; + } + } + st = lipc_checksum_verify(wire, payload_buf, h->length); + if (st != LIPC_OK) { + return st; + } + if (payload_len) { + *payload_len = h->length; + } + return LIPC_OK; +} + +lipc_status lipc_frame_write(int fd, lipc_header *h, const void *payload, size_t payload_len) +{ + if (!h || h->length != payload_len) { + return LIPC_BAD_LENGTH; + } + if (payload_len > 0 && !payload) { + return LIPC_BAD_HEADER; + } + + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_frame_fill_checksum(wire, h, payload, payload_len); + + int wr = lipc_write_full(fd, wire, sizeof wire); + if (wr == LIPC_READ_FULL_AGAIN) { + return LIPC_WOULD_BLOCK; + } + if (wr != LIPC_READ_FULL_OK) { + return LIPC_IO_ERROR; + } + if (payload_len > 0 && payload) { + wr = lipc_write_full(fd, payload, payload_len); + if (wr == LIPC_READ_FULL_AGAIN) { + return LIPC_WOULD_BLOCK; + } + if (wr != LIPC_READ_FULL_OK) { + return LIPC_IO_ERROR; + } + } + return LIPC_OK; +} + +void lipc_stream_rx_init(lipc_stream_rx *rx) +{ + memset(rx, 0, sizeof *rx); +} + +lipc_status lipc_stream_rx_feed(lipc_stream_rx *rx, uint32_t max_payload, const uint8_t *data, + size_t data_len, size_t *data_off, lipc_header *out_hdr, uint8_t *payload_buf, + size_t payload_cap, size_t *payload_len, int *frame_done) +{ + if (!rx || !data_off || !out_hdr || !payload_len || !frame_done) { + return LIPC_BAD_HEADER; + } + *frame_done = 0; + + if (*data_off > data_len) { + return LIPC_BAD_HEADER; + } + + while (*data_off < data_len) { + if (!rx->in_payload) { + size_t need = LIPC_HEADER_SIZE - rx->header_got; + size_t avail = data_len - *data_off; + size_t take = need < avail ? need : avail; + memcpy(rx->header + rx->header_got, data + *data_off, take); + rx->header_got += take; + *data_off += take; + if (rx->header_got < LIPC_HEADER_SIZE) { + return LIPC_INCOMPLETE; + } + + lipc_status st = lipc_header_decode(out_hdr, rx->header, max_payload); + if (st != LIPC_OK) { + return st; + } + rx->payload_len = out_hdr->length; + rx->payload_got = 0; + rx->in_payload = 1; + if (rx->payload_len > payload_cap) { + return LIPC_BAD_LENGTH; + } + if (rx->payload_len == 0) { + st = lipc_checksum_verify(rx->header, NULL, 0); + if (st != LIPC_OK) { + return st; + } + *payload_len = 0; + *frame_done = 1; + lipc_stream_rx_init(rx); + return LIPC_OK; + } + } + + size_t need = rx->payload_len - rx->payload_got; + size_t avail = data_len - *data_off; + size_t take = need < avail ? need : avail; + if (take == 0) { + return LIPC_INCOMPLETE; + } + memcpy(payload_buf + rx->payload_got, data + *data_off, take); + rx->payload_got += take; + *data_off += take; + if (rx->payload_got < rx->payload_len) { + return LIPC_INCOMPLETE; + } + + lipc_status st = lipc_checksum_verify(rx->header, payload_buf, rx->payload_len); + if (st != LIPC_OK) { + return st; + } + *payload_len = rx->payload_len; + *frame_done = 1; + lipc_stream_rx_init(rx); + return LIPC_OK; + } + + return LIPC_INCOMPLETE; +} diff --git a/subprojects/localipc/lipc_packet_test.c b/subprojects/localipc/lipc_packet_test.c new file mode 100644 index 0000000..c5801e9 --- /dev/null +++ b/subprojects/localipc/lipc_packet_test.c @@ -0,0 +1,369 @@ +/* + * SPDX-License-Identifier: GPL-2.0-only WITH Classpath-exception-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L +_Static_assert(LIPC_OFF_VALUE + 8u == LIPC_HEADER_SIZE, "wire header size"); +#endif + +static void test_status_helpers(void **state) +{ + (void)state; + + assert_true(lipc_status_is_ok(LIPC_OK)); + assert_int_equal(lipc_status_is_ok(LIPC_BAD_MAGIC), 0); + assert_int_equal(lipc_status_is_ok((lipc_status)LIPC_STATUS_APP_BASE), 0); + assert_int_equal(lipc_status_is_ok((lipc_status)0xffff), 0); + + assert_string_equal(lipc_status_str(LIPC_OK), "OK"); + assert_string_equal(lipc_status_str(LIPC_IO_ERROR), "IO_ERROR"); + assert_string_equal(lipc_status_str((lipc_status)LIPC_STATUS_APP_BASE), "APPLICATION_DEFINED"); + assert_string_equal(lipc_status_str((lipc_status)0xff00), "APPLICATION_DEFINED"); + assert_string_equal(lipc_status_str((lipc_status)99), "UNKNOWN_LIPC_STATUS"); +} + +static void test_header_round_trip(void **state) +{ + (void)state; + lipc_header in = { + .magic = LIPC_PROTOCOL_MAGIC, + .version = LIPC_PROTOCOL_VERSION, + .request = 42, + .status = 0, + .index = 7, + .tid = 0x11223344u, + .length = 0, + .checksum = 0, + .value = 0x010203040a0b0c0du, + }; + uint8_t wire[LIPC_HEADER_SIZE]; + + lipc_header_encode(wire, &in); + lipc_header out; + assert_int_equal(lipc_header_decode(&out, wire, LIPC_MAX_PAYLOAD_DEFAULT), LIPC_OK); + assert_int_equal(out.magic, in.magic); + assert_int_equal(out.version, in.version); + assert_int_equal(out.request, in.request); + assert_int_equal(out.status, in.status); + assert_int_equal(out.index, in.index); + assert_int_equal(out.tid, in.tid); + assert_int_equal(out.length, in.length); + assert_int_equal(out.checksum, 0); + assert_true(out.value == in.value); +} + +static void test_crc32_empty_payload_zlib_compat(void **state) +{ + (void)state; + lipc_header z = { 0 }; + uint8_t wire[LIPC_HEADER_SIZE]; + + lipc_header_encode(wire, &z); + uint32_t c = lipc_checksum_compute(wire, NULL, 0); + /* Matches CPython zlib.crc32 on this 32-byte blob (checksum field zero). */ + assert_int_equal((int)c, (int)0x5d29d25du); +} + +static void test_crc32_payload_zlib_compat(void **state) +{ + (void)state; + lipc_header h = { + .magic = LIPC_PROTOCOL_MAGIC, + .version = LIPC_PROTOCOL_VERSION, + .request = 0x1234u, + .status = 0, + .index = 2, + .tid = 0x10203040u, + .length = 5, + .checksum = 0, + .value = 0x0102030405060708ull, + }; + const char payload[] = "hello"; + uint8_t wire[LIPC_HEADER_SIZE]; + + lipc_header_encode(wire, &h); + /* Matches CPython zlib.crc32 for header bytes + payload bytes. */ + assert_int_equal((int)lipc_checksum_compute(wire, payload, 5), (int)0x4b413dc9u); +} + +static void test_header_bad_version(void **state) +{ + (void)state; + lipc_header z = { 0 }; + uint8_t wire[LIPC_HEADER_SIZE]; + + lipc_header_encode(wire, &z); + wire[LIPC_OFF_VERSION + 1] = 2; + lipc_header out; + assert_int_equal(lipc_header_decode(&out, wire, LIPC_MAX_PAYLOAD_DEFAULT), LIPC_BAD_VERSION); +} + +static void test_header_length_cap(void **state) +{ + (void)state; + lipc_header h = { + .length = 64, + }; + uint8_t wire[LIPC_HEADER_SIZE]; + + lipc_header_encode(wire, &h); + lipc_header out; + assert_int_equal(lipc_header_decode(&out, wire, 63), LIPC_BAD_LENGTH); + assert_int_equal(lipc_header_decode(&out, wire, 64), LIPC_OK); +} + +static void test_frame_reader_nonblocking(void **state) +{ + (void)state; + int sp[2]; + + assert_int_equal(socketpair(AF_UNIX, SOCK_STREAM, 0, sp), 0); + assert_int_equal(fcntl(sp[0], F_SETFL, fcntl(sp[0], F_GETFL) | O_NONBLOCK), 0); + + lipc_header hdr = { + .magic = LIPC_MAGIC, + .version = LIPC_PROTOCOL_VERSION, + .request = 9, + .status = 0, + .index = 0, + .tid = 2, + .length = 4, + .checksum = 0, + .value = 0, + }; + const char *pl = "data"; + uint8_t frame[LIPC_HEADER_SIZE]; + + lipc_frame_fill_checksum(frame, &hdr, pl, 4); + + assert_int_equal(write(sp[1], frame, 10), 10); + assert_int_equal(write(sp[1], frame + 10, 22), 22); + assert_int_equal(write(sp[1], pl, 4), 4); + + lipc_frame_reader rr; + lipc_frame_reader_init(&rr); + uint8_t scratch[512]; + lipc_header out; + uint8_t payload[16]; + size_t plen = 0; + uint8_t out_wire[LIPC_HEADER_SIZE]; + + lipc_status st = LIPC_WOULD_BLOCK; + int spins = 0; + + while (st == LIPC_WOULD_BLOCK && spins < 32) { + st = lipc_frame_reader_read(sp[0], &rr, LIPC_MAX_PAYLOAD_DEFAULT, scratch, sizeof scratch, + out_wire, &out, payload, sizeof payload, &plen); + spins++; + } + assert_int_equal(st, LIPC_OK); + assert_int_equal((int)plen, 4); + assert_memory_equal(payload, pl, 4); + assert_int_equal(lipc_checksum_verify(out_wire, payload, plen), LIPC_OK); + + close(sp[0]); + close(sp[1]); +} + +static void test_frame_writer_socketpair(void **state) +{ + (void)state; + int sp[2]; + + assert_int_equal(socketpair(AF_UNIX, SOCK_STREAM, 0, sp), 0); + + lipc_header hdr = { + .magic = LIPC_MAGIC, + .version = LIPC_PROTOCOL_VERSION, + .request = 3, + .status = 0, + .index = 1, + .tid = 99, + .length = 2, + .checksum = 0, + .value = 0xabcdull, + }; + const char *pl = "xy"; + + lipc_frame_writer wr; + assert_int_equal(lipc_frame_writer_begin(&wr, &hdr, pl, 2), LIPC_OK); + + lipc_status st = LIPC_WOULD_BLOCK; + int spins = 0; + + while (st == LIPC_WOULD_BLOCK && spins < 32) { + st = lipc_frame_writer_write(sp[1], &wr); + spins++; + } + assert_int_equal(st, LIPC_OK); + + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_header rh; + uint8_t payload[8]; + size_t plen = 0; + + assert_int_equal(lipc_frame_read(sp[0], LIPC_MAX_PAYLOAD_DEFAULT, wire, &rh, payload, + sizeof payload, &plen), + LIPC_OK); + assert_int_equal((int)plen, 2); + assert_memory_equal(payload, pl, 2); + assert_int_equal(rh.request, 3); + assert_int_equal(rh.index, 1); + assert_int_equal(rh.tid, 99u); + assert_true(rh.value == 0xabcdull); + + close(sp[0]); + close(sp[1]); +} + +static void test_stream_split_header(void **state) +{ + (void)state; + lipc_header in = { + .magic = LIPC_PROTOCOL_MAGIC, + .version = LIPC_PROTOCOL_VERSION, + .request = 1, + .status = 0, + .index = 0, + .tid = 1, + .length = 3, + .checksum = 0, + .value = 0, + }; + const char *pld = "abc"; + in.length = 3; + uint8_t wire[LIPC_HEADER_SIZE]; + lipc_frame_fill_checksum(wire, &in, pld, 3); + + lipc_stream_rx rx; + lipc_stream_rx_init(&rx); + lipc_header out; + uint8_t payload[16]; + size_t payload_len = 0; + size_t off = 0; + int done = 0; + lipc_status st = lipc_stream_rx_feed(&rx, LIPC_MAX_PAYLOAD_DEFAULT, wire, 10, &off, &out, + payload, sizeof payload, &payload_len, &done); + assert_int_equal(st, LIPC_INCOMPLETE); + assert_int_equal(done, 0); + assert_int_equal((int)off, 10); + + st = lipc_stream_rx_feed(&rx, LIPC_MAX_PAYLOAD_DEFAULT, wire, 32, &off, &out, payload, + sizeof payload, &payload_len, &done); + assert_int_equal(st, LIPC_INCOMPLETE); + assert_int_equal(done, 0); + assert_int_equal((int)off, 32); + + off = 0; + st = lipc_stream_rx_feed(&rx, LIPC_MAX_PAYLOAD_DEFAULT, (const uint8_t *)pld, 3, &off, &out, + payload, sizeof payload, &payload_len, &done); + assert_int_equal(st, LIPC_OK); + assert_int_equal(done, 1); + assert_int_equal(payload_len, 3); + assert_memory_equal(payload, pld, 3); +} + +static void test_socket_helpers_sockaddr_validation(void **state) +{ + (void)state; + struct sockaddr_un addr; + socklen_t len = 0; + char long_path[sizeof(addr.sun_path) + 8]; + + memset(long_path, 'a', sizeof(long_path)); + long_path[sizeof(long_path) - 1] = '\0'; + + assert_non_null(lipc_default_socket_path()); + assert_true(strlen(lipc_default_socket_path()) > 0); + assert_int_equal(lipc_sockaddr_init("/tmp/lipc_packet_test.sock", &addr, &len), LIPC_OK); + assert_int_equal(addr.sun_family, AF_UNIX); + assert_true(len > 0); + assert_int_equal(strcmp(addr.sun_path, "/tmp/lipc_packet_test.sock"), 0); + + assert_int_equal(lipc_sockaddr_init(NULL, &addr, &len), LIPC_INVALID_PARAMS); + assert_int_equal(lipc_sockaddr_init("", &addr, &len), LIPC_INVALID_PARAMS); + assert_int_equal(lipc_sockaddr_init(long_path, &addr, &len), LIPC_BAD_LENGTH); +} + +static void test_socket_helpers_fd_flags(void **state) +{ + (void)state; + int sp[2]; + + assert_int_equal(socketpair(AF_UNIX, SOCK_STREAM, 0, sp), 0); + assert_int_equal(lipc_socket_set_nonblocking(sp[0], 1), LIPC_OK); + assert_true((fcntl(sp[0], F_GETFL, 0) & O_NONBLOCK) != 0); + assert_int_equal(lipc_socket_set_nonblocking(sp[0], 0), LIPC_OK); + assert_true((fcntl(sp[0], F_GETFL, 0) & O_NONBLOCK) == 0); + + assert_int_equal(lipc_socket_set_cloexec(sp[1], 1), LIPC_OK); + assert_true((fcntl(sp[1], F_GETFD, 0) & FD_CLOEXEC) != 0); + assert_int_equal(lipc_socket_set_cloexec(sp[1], 0), LIPC_OK); + assert_true((fcntl(sp[1], F_GETFD, 0) & FD_CLOEXEC) == 0); + + close(sp[0]); + close(sp[1]); +} + +static void test_socket_helpers_listen_connect(void **state) +{ + (void)state; + char path[108]; + int listen_fd = -1; + int client_fd = -1; + int server_fd = -1; + + assert_true(snprintf(path, sizeof(path), "/tmp/lipc_socket_helper_%ld_%ld.sock", + (long)getpid(), random()) + < (int)sizeof(path)); + + listen_fd = lipc_socket_listen(path, 4, LIPC_SOCKET_CLOEXEC); + assert_true(listen_fd >= 0); + client_fd = lipc_socket_connect(path, LIPC_SOCKET_CLOEXEC); + assert_true(client_fd >= 0); + + server_fd = accept(listen_fd, NULL, NULL); + assert_true(server_fd >= 0); + + close(server_fd); + close(client_fd); + close(listen_fd); + unlink(path); +} + +int main(void) +{ + const struct CMUnitTest tests[] = { + cmocka_unit_test(test_status_helpers), + cmocka_unit_test(test_header_round_trip), + cmocka_unit_test(test_crc32_empty_payload_zlib_compat), + cmocka_unit_test(test_crc32_payload_zlib_compat), + cmocka_unit_test(test_header_bad_version), + cmocka_unit_test(test_header_length_cap), + cmocka_unit_test(test_frame_reader_nonblocking), + cmocka_unit_test(test_frame_writer_socketpair), + cmocka_unit_test(test_stream_split_header), + cmocka_unit_test(test_socket_helpers_sockaddr_validation), + cmocka_unit_test(test_socket_helpers_fd_flags), + cmocka_unit_test(test_socket_helpers_listen_connect), + }; + cmocka_set_message_output(CM_OUTPUT_TAP); + return cmocka_run_group_tests(tests, NULL, NULL); +} diff --git a/subprojects/localipc/lipc_socket.c b/subprojects/localipc/lipc_socket.c new file mode 100644 index 0000000..6aeb409 --- /dev/null +++ b/subprojects/localipc/lipc_socket.c @@ -0,0 +1,237 @@ +/* + * UNIX socket helpers for localipc. + * + * SPDX-License-Identifier: GPL-2.0-only WITH Classpath-exception-2.0 + */ + +#include + +#include +#include +#include +#include +#include +#include + +const char *lipc_default_socket_path(void) +{ + return LIPC_DEFAULT_SOCKET_PATH; +} + +lipc_status lipc_sockaddr_init(const char *path, struct sockaddr_un *addr, socklen_t *len) +{ + size_t path_len; + + if (!path || !addr) { + errno = EINVAL; + return LIPC_INVALID_PARAMS; + } + if (path[0] == '\0') { + errno = EINVAL; + return LIPC_INVALID_PARAMS; + } + + path_len = strlen(path); + if (path_len >= sizeof(addr->sun_path)) { + errno = ENAMETOOLONG; + return LIPC_BAD_LENGTH; + } + + memset(addr, 0, sizeof(*addr)); + addr->sun_family = AF_UNIX; + memcpy(addr->sun_path, path, path_len + 1); + if (len) { + *len = (socklen_t)(offsetof(struct sockaddr_un, sun_path) + path_len + 1u); + } + return LIPC_OK; +} + +lipc_status lipc_socket_set_nonblocking(int fd, int enabled) +{ + int flags; + + if (fd < 0) { + errno = EINVAL; + return LIPC_INVALID_PARAMS; + } + flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + return LIPC_IO_ERROR; + } + + if (enabled) { + if ((flags & O_NONBLOCK) != 0) { + return LIPC_OK; + } + if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) < 0) { + return LIPC_IO_ERROR; + } + } else { + if ((flags & O_NONBLOCK) == 0) { + return LIPC_OK; + } + if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) < 0) { + return LIPC_IO_ERROR; + } + } + + return LIPC_OK; +} + +lipc_status lipc_socket_set_cloexec(int fd, int enabled) +{ + int flags; + + if (fd < 0) { + errno = EINVAL; + return LIPC_INVALID_PARAMS; + } + flags = fcntl(fd, F_GETFD, 0); + if (flags < 0) { + return LIPC_IO_ERROR; + } + + if (enabled) { + if ((flags & FD_CLOEXEC) != 0) { + return LIPC_OK; + } + if (fcntl(fd, F_SETFD, flags | FD_CLOEXEC) < 0) { + return LIPC_IO_ERROR; + } + } else { + if ((flags & FD_CLOEXEC) == 0) { + return LIPC_OK; + } + if (fcntl(fd, F_SETFD, flags & ~FD_CLOEXEC) < 0) { + return LIPC_IO_ERROR; + } + } + + return LIPC_OK; +} + +static int lipc_socket_create_stream(unsigned int flags) +{ + int type = SOCK_STREAM; + int fd; + +#ifdef SOCK_NONBLOCK + if ((flags & LIPC_SOCKET_NONBLOCK) != 0u) { + type |= SOCK_NONBLOCK; + } +#endif +#ifdef SOCK_CLOEXEC + if ((flags & LIPC_SOCKET_CLOEXEC) != 0u) { + type |= SOCK_CLOEXEC; + } +#endif + + fd = socket(AF_UNIX, type, 0); + if (fd < 0) { + return -1; + } + +#ifndef SOCK_NONBLOCK + if ((flags & LIPC_SOCKET_NONBLOCK) != 0u + && lipc_socket_set_nonblocking(fd, 1) != LIPC_OK) { + int saved = errno; + close(fd); + errno = saved; + return -1; + } +#else + if ((flags & LIPC_SOCKET_NONBLOCK) != 0u + && lipc_socket_set_nonblocking(fd, 1) != LIPC_OK) { + int saved = errno; + close(fd); + errno = saved; + return -1; + } +#endif + +#ifndef SOCK_CLOEXEC + if ((flags & LIPC_SOCKET_CLOEXEC) != 0u + && lipc_socket_set_cloexec(fd, 1) != LIPC_OK) { + int saved = errno; + close(fd); + errno = saved; + return -1; + } +#else + if ((flags & LIPC_SOCKET_CLOEXEC) != 0u + && lipc_socket_set_cloexec(fd, 1) != LIPC_OK) { + int saved = errno; + close(fd); + errno = saved; + return -1; + } +#endif + + return fd; +} + +int lipc_socket_listen(const char *path, int backlog, unsigned int flags) +{ + struct sockaddr_un addr; + socklen_t addr_len = 0; + int fd; + + if (backlog <= 0) { + errno = EINVAL; + return -1; + } + if (lipc_sockaddr_init(path, &addr, &addr_len) != LIPC_OK) { + return -1; + } + + fd = lipc_socket_create_stream(flags); + if (fd < 0) { + return -1; + } + + unlink(path); + if (bind(fd, (const struct sockaddr *)&addr, addr_len) < 0) { + int saved = errno; + close(fd); + errno = saved; + return -1; + } + if (listen(fd, backlog) < 0) { + int saved = errno; + close(fd); + unlink(path); + errno = saved; + return -1; + } + + return fd; +} + +int lipc_socket_connect(const char *path, unsigned int flags) +{ + struct sockaddr_un addr; + socklen_t addr_len = 0; + int fd; + + if (lipc_sockaddr_init(path, &addr, &addr_len) != LIPC_OK) { + return -1; + } + + fd = lipc_socket_create_stream(flags); + if (fd < 0) { + return -1; + } + + if (connect(fd, (const struct sockaddr *)&addr, addr_len) < 0) { + if ((flags & LIPC_SOCKET_NONBLOCK) != 0u + && (errno == EINPROGRESS || errno == EALREADY)) { + return fd; + } + int saved = errno; + close(fd); + errno = saved; + return -1; + } + + return fd; +} diff --git a/subprojects/localipc/meson.build b/subprojects/localipc/meson.build new file mode 100644 index 0000000..7782ef6 --- /dev/null +++ b/subprojects/localipc/meson.build @@ -0,0 +1,64 @@ +project('localipc', 'C', + version: '0.1.0', + license: 'GPL-2.0-only WITH Classpath-exception-2.0', + meson_version: '>=0.61') + +pkgconfig = import('pkgconfig') +compiler = meson.get_compiler('c') + +sym_hidden_cargs = [] +if compiler.has_argument('-fvisibility=hidden') + sym_hidden_cargs = ['-fvisibility=hidden'] +endif + +dep_localipc_threads = dependency('threads') + +localipc_inc = include_directories('include') + +lib_localipc_sources = files('lipc_crc32.c', 'lipc_packet.c', 'lipc_dispatch.c', 'lipc_socket.c') + +dep_cmocka = dependency('cmocka', required: false) + +lib_localipc = library('localipc', lib_localipc_sources, + version: meson.project_version(), + install: true, + c_args: sym_hidden_cargs, + dependencies: [dep_localipc_threads], + include_directories: [localipc_inc], +) + +install_headers( + 'include/localipc/lipc.h', + install_dir: get_option('includedir') / 'localipc', +) + +pkgconfig.generate(lib_localipc, + name: 'localipc', + description: 'Framed local IPC protocol library (stream-safe packets, CRC, dispatch).', + version: meson.project_version(), +) + +liblocalipc_dep = declare_dependency( + link_with: lib_localipc, + include_directories: localipc_inc, +) + +if dep_cmocka.found() + lipc_packet_test = executable('lipc_packet_test', 'lipc_packet_test.c', + include_directories: localipc_inc, + link_with: lib_localipc, + dependencies: [dep_localipc_threads, dep_cmocka], + ) + test('lipc_packet', lipc_packet_test, protocol:'tap') + + lipc_dispatch_test = executable('lipc_dispatch_test', 'lipc_dispatch_test.c', + include_directories: localipc_inc, + link_with: lib_localipc, + dependencies: [dep_localipc_threads, dep_cmocka], + ) + test('lipc_dispatch', lipc_dispatch_test, protocol:'tap') +endif + +if get_option('lipc_examples') + subdir('examples') +endif diff --git a/subprojects/localipc/meson_options.txt b/subprojects/localipc/meson_options.txt new file mode 100644 index 0000000..b328ec7 --- /dev/null +++ b/subprojects/localipc/meson_options.txt @@ -0,0 +1,4 @@ +option('lipc_examples', + type: 'boolean', + value: false, + description: 'Build localipc example programs')