coap protocol tunneling

published: August 12, 2025

coap (constrained application protocol) tunneling exploits header field manipulation and message id patterns in rfc 7252 implementations to create covert channels in iot environments.

technical description

coap is a specialized web transfer protocol designed for constrained devices and networks. operating over udp, coap provides a lightweight alternative to http for iot applications. the protocol’s header structure and option fields offer multiple vectors for covert data transmission.

covert channel opportunities:

  1. message id manipulation: 16-bit message identifier patterns
  2. token field exploitation: 0-8 byte client-generated tokens
  3. option field abuse: type-length-value encoded options
  4. uri-path encoding: covert data in resource paths
  5. block-wise transfer manipulation: sequence numbers and sizes

coap message format

header structure

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|ver| t |  tkl  |      code     |          message id           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|   token (if any, tkl bytes) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|   options (if any) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|1 1 1 1 1 1 1 1|    payload (if any) ...
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

field descriptions

fieldsizedescriptioncovert potential
version2 bitscoap version (1)none - fixed
type2 bitsconfirmable/non-confirmablelow
token length4 bits0-8 bytesmedium
code8 bitsmethod/response codelow
message id16 bitstransaction identifierhigh
token0-8 bytesclient-generated identifierhigh
optionsvariablerequest/response metadatahigh
payloadvariablemessage contenthigh

implementation: cceap educational framework

overview

cceap (https://github.com/cdpxe/cceap) provides educational coap covert channel implementations:

  • academic research framework
  • multiple encoding techniques demonstrated
  • focus on educational understanding
  • implementation examples in python

installation

# clone cceap repository
git clone https://github.com/cdpxe/cceap.git
cd cceap

# install dependencies
pip install -r requirements.txt

# install coap libraries
pip install aiocoap coapthon3

message id covert channel

# coap message id covert channel implementation
import asyncio
import struct
import time
from aiocoap import *

class coapmessageidchannel:
    def __init__(self, server_host='localhost', server_port=5683):
        self.server_host = server_host
        self.server_port = server_port
        self.context = none

    async def initialize_client(self):
        """initialize coap client context"""
        self.context = await context.create_client_context()

    async def encode_in_message_id(self, data):
        """encode binary data using message id field"""

        # convert data to 16-bit chunks
        binary_data = ''.join(format(ord(c), '08b') for c in data)

        # pad to multiple of 16 bits
        while len(binary_data) % 16 != 0:
            binary_data += '0'

        message_ids = []
        for i in range(0, len(binary_data), 16):
            chunk = binary_data[i:i+16]
            message_id = int(chunk, 2)
            message_ids.append(message_id)

        return message_ids

    async def send_covert_message(self, covert_data, resource_path='/test'):
        """send covert data via message id manipulation"""

        if not self.context:
            await self.initialize_client()

        message_ids = await self.encode_in_message_id(covert_data)
        responses = []

        print(f"encoding '{covert_data}' in {len(message_ids)} message ids")

        for i, msg_id in enumerate(message_ids):
            # create coap request with specific message id
            request = message(code=get, uri=f'coap://{self.server_host}:{self.server_port}{resource_path}')

            # manually set message id (requires low-level access)
            # in practice, this would need modification of the coap library
            print(f"sending message {i+1}/{len(message_ids)} with id {msg_id:04x}")

            try:
                response = await self.context.request(request).response
                responses.append(response)
                print(f"response: {response.code}")

            except exception as e:
                print(f"request failed: {e}")

            # small delay to avoid flooding
            await asyncio.sleep(0.1)

        return responses

    def decode_message_ids(self, message_ids):
        """decode data from sequence of message ids"""

        binary_data = ''
        for msg_id in message_ids:
            # convert message id to 16-bit binary
            binary_chunk = format(msg_id, '016b')
            binary_data += binary_chunk

        # convert binary to text
        decoded_text = ''
        for i in range(0, len(binary_data), 8):
            byte_bits = binary_data[i:i+8]
            if len(byte_bits) == 8 and byte_bits != '00000000':
                decoded_text += chr(int(byte_bits, 2))

        return decoded_text

# coap server for message id monitoring
class coapmessageidserver:
    def __init__(self, port=5683):
        self.port = port
        self.received_message_ids = []

    async def start_server(self):
        """start coap server to monitor message ids"""

        root = resource.site()
        root.add_resource(['test'], testresource(self))

        await context.create_server_context(root, bind=('localhost', self.port))
        print(f"coap message id monitoring server started on port {self.port}")

        # keep server running
        await asyncio.get_event_loop().create_future()

    def log_message_id(self, message_id):
        """log received message id"""
        self.received_message_ids.append(message_id)
        print(f"logged message id: {message_id:04x}")

        # attempt decoding after every few messages
        if len(self.received_message_ids) % 4 == 0:
            self.attempt_decode()

    def attempt_decode(self):
        """attempt to decode collected message ids"""

        if len(self.received_message_ids) < 2:
            return

        channel = coapmessageidchannel()
        decoded = channel.decode_message_ids(self.received_message_ids)

        if decoded.strip():
            print(f"decoded message: '{decoded}'")

class testresource(resource.resource):
    def __init__(self, server):
        super().__init__()
        self.server = server

    async def render_get(self, request):
        """handle get requests and log message ids"""

        # extract message id from request
        # note: this requires access to underlying coap message
        # in practice, would need coap library modification
        message_id = getattr(request, 'mid', 0)

        self.server.log_message_id(message_id)

        return message(payload=b'coap response', code=content)

# usage
async def run_covert_channel():
    # start server
    server = coapmessageidserver()
    server_task = asyncio.create_task(server.start_server())

    # small delay for server startup
    await asyncio.sleep(1)

    # send covert message
    client = coapmessageidchannel()
    await client.send_covert_message('secret')

    # let server process
    await asyncio.sleep(2)

# asyncio.run(run_covert_channel())

token field covert channel

# coap token field covert channel
import os
import base64
from aiocoap import *

class coaptokenchannel:
    def __init__(self):
        self.context = none

    async def initialize_context(self):
        """initialize coap context"""
        self.context = await context.create_client_context()

    def encode_in_token(self, data, max_token_length=8):
        """encode data in coap token fields"""

        # convert data to bytes
        data_bytes = data.encode() if isinstance(data, str) else data

        # split into token-sized chunks
        token_chunks = []
        for i in range(0, len(data_bytes), max_token_length):
            chunk = data_bytes[i:i+max_token_length]
            token_chunks.append(chunk)

        return token_chunks

    async def send_token_covert_data(self, data, server_uri):
        """send covert data using token field"""

        if not self.context:
            await self.initialize_context()

        token_chunks = self.encode_in_token(data)

        print(f"sending {len(token_chunks)} coap requests with token data")

        responses = []

        for i, token_data in enumerate(token_chunks):
            # create request with custom token
            request = message(
                code=get,
                uri=server_uri,
                token=token_data  # embed covert data in token
            )

            print(f"token {i+1}: {token_data.hex()}")

            try:
                response = await self.context.request(request).response
                responses.append(response)
            except exception as e:
                print(f"request {i+1} failed: {e}")

            await asyncio.sleep(0.2)

        return responses

    def decode_from_tokens(self, token_list):
        """decode data from token sequence"""

        decoded_data = b''
        for token in token_list:
            decoded_data += token

        try:
            return decoded_data.decode('utf-8')
        except unicodedecodeerror:
            return decoded_data.hex()

# coap option field covert channel
class coapoptionchannel:
    def __init__(self):
        self.context = none

        # coap option numbers for covert use
        self.covert_options = {
            'custom_1': 65000,  # experimental range
            'custom_2': 65001,
            'custom_3': 65002
        }

    async def initialize_context(self):
        """initialize coap context"""
        self.context = await context.create_client_context()

    def create_covert_options(self, data):
        """create coap options with covert data"""

        data_bytes = data.encode() if isinstance(data, str) else data
        options = {}

        # split data across multiple options
        chunk_size = 20  # reasonable option value size
        chunks = [data_bytes[i:i+chunk_size] for i in range(0, len(data_bytes), chunk_size)]

        option_keys = list(self.covert_options.keys())

        for i, chunk in enumerate(chunks):
            if i < len(option_keys):
                option_num = self.covert_options[option_keys[i]]
                options[option_num] = chunk

        return options

    async def send_option_covert_data(self, data, server_uri):
        """send covert data using coap options"""

        if not self.context:
            await self.initialize_context()

        covert_options = self.create_covert_options(data)

        # create request with covert options
        request = message(code=get, uri=server_uri)

        # add covert options (requires low-level coap manipulation)
        for option_num, option_value in covert_options.items():
            # note: this requires coap library that supports custom options
            print(f"adding option {option_num}: {option_value.hex()}")

        try:
            response = await self.context.request(request).response
            return response
        except exception as e:
            print(f"option covert request failed: {e}")
            return none

# uri-path covert channel
class coapuripathchannel:
    def __init__(self):
        self.context = none

    async def initialize_context(self):
        """initialize coap context"""
        self.context = await context.create_client_context()

    def encode_in_uri_path(self, data):
        """encode data in uri path segments"""

        # base64 encode data for uri safety
        encoded_data = base64.urlsafe_b64encode(data.encode()).decode()

        # split into path segments
        segment_size = 10
        segments = [encoded_data[i:i+segment_size]
                   for i in range(0, len(encoded_data), segment_size)]

        # create path with legitimate-looking prefixes
        path_segments = []
        prefixes = ['api', 'v1', 'device', 'sensor', 'data', 'status']

        for i, segment in enumerate(segments):
            prefix = prefixes[i % len(prefixes)]
            path_segments.append(f"{prefix}_{segment}")

        return '/' + '/'.join(path_segments)

    async def send_uri_covert_data(self, data, base_server_uri):
        """send covert data encoded in uri path"""

        if not self.context:
            await self.initialize_context()

        covert_path = self.encode_in_uri_path(data)
        full_uri = base_server_uri.rstrip('/') + covert_path

        print(f"covert uri: {full_uri}")

        request = message(code=get, uri=full_uri)

        try:
            response = await self.context.request(request).response
            return response
        except exception as e:
            print(f"uri covert request failed: {e}")
            return none

    def decode_from_uri_path(self, uri_path):
        """decode data from uri path"""

        # extract encoded segments
        segments = uri_path.strip('/').split('/')

        encoded_parts = []
        for segment in segments:
            if '_' in segment:
                # extract data part after prefix
                data_part = segment.split('_', 1)[1]
                encoded_parts.append(data_part)

        # reconstruct encoded data
        full_encoded = ''.join(encoded_parts)

        try:
            # decode base64
            decoded_bytes = base64.urlsafe_b64decode(full_encoded + '==')  # padding
            return decoded_bytes.decode('utf-8')
        except exception as e:
            print(f"uri decoding failed: {e}")
            return none

# usage example
async def demonstrate_coap_covert_channels():
    """demonstrate various coap covert channel techniques"""

    server_uri = 'coap://localhost:5683'

    print("=== coap token channel ===")
    token_channel = coaptokenchannel()
    await token_channel.send_token_covert_data('hidden', server_uri + '/test')

    print("\n=== coap uri path channel ===")
    uri_channel = coapuripathchannel()
    await uri_channel.send_uri_covert_data('secret', server_uri)

    print("\n=== coap option channel ===")
    option_channel = coapoptionchannel()
    await option_channel.send_option_covert_data('covert', server_uri + '/test')

# asyncio.run(demonstrate_coap_covert_channels())

block-wise transfer covert channel

implementation

# coap block-wise transfer covert channel
import struct
import math
from aiocoap import *

class coapblockwisechannel:
    def __init__(self):
        self.context = none
        self.block_size_exponents = [4, 5, 6, 7, 8, 9, 10]  # 16 to 1024 bytes

    async def initialize_context(self):
        """initialize coap context"""
        self.context = await context.create_client_context()

    def encode_in_block_parameters(self, data):
        """encode data using block size and sequence manipulation"""

        # convert data to binary
        binary_data = ''.join(format(ord(c), '08b') for c in data)

        # encode in block size exponent (3 bits per block)
        block_parameters = []

        for i in range(0, len(binary_data), 3):
            chunk = binary_data[i:i+3].ljust(3, '0')  # pad if needed
            block_size_idx = int(chunk, 2)

            if block_size_idx < len(self.block_size_exponents):
                block_size_exp = self.block_size_exponents[block_size_idx]
            else:
                block_size_exp = self.block_size_exponents[0]  # fallback

            block_parameters.append({
                'size_exponent': block_size_exp,
                'block_number': i // 3,  # sequence number
                'more_blocks': i + 3 < len(binary_data)
            })

        return block_parameters

    async def send_blockwise_covert_data(self, data, server_uri):
        """send covert data using block-wise transfers"""

        if not self.context:
            await self.initialize_context()

        block_params = self.encode_in_block_parameters(data)

        print(f"sending {len(block_params)} block requests with covert data")

        responses = []

        for i, params in enumerate(block_params):
            # create block option value
            # format: num=block_number, m=more_flag, szx=size_exponent
            block_value = (params['block_number'] << 4) | \
                         (1 if params['more_blocks'] else 0) << 3 | \
                         (params['size_exponent'] - 4)  # szx encoding

            print(f"block {i}: num={params['block_number']}, " +
                  f"szx={params['size_exponent']}, more={params['more_blocks']}")

            # create coap request with block1 option
            request = message(
                code=post,
                uri=server_uri,
                payload=b'x' * (2 ** params['size_exponent'])  # dummy payload
            )

            # add block1 option (would need coap library support)
            # request.opt.block1 = (params['block_number'], params['more_blocks'], params['size_exponent'])

            try:
                response = await self.context.request(request).response
                responses.append(response)
            except exception as e:
                print(f"block request {i} failed: {e}")

            await asyncio.sleep(0.1)

        return responses

    def decode_block_parameters(self, block_params):
        """decode data from block parameters"""

        binary_data = ''

        # sort by block number to ensure correct order
        sorted_params = sorted(block_params, key=lambda x: x['block_number'])

        for params in sorted_params:
            # find size exponent index
            if params['size_exponent'] in self.block_size_exponents:
                size_idx = self.block_size_exponents.index(params['size_exponent'])
            else:
                size_idx = 0

            # convert to 3-bit binary
            binary_chunk = format(size_idx, '03b')
            binary_data += binary_chunk

        # convert binary to text
        decoded_text = ''
        for i in range(0, len(binary_data), 8):
            byte_bits = binary_data[i:i+8]
            if len(byte_bits) == 8:
                char_code = int(byte_bits, 2)
                if 32 <= char_code <= 126:  # printable ascii
                    decoded_text += chr(char_code)

        return decoded_text

# coap observe covert channel
class coapobservechannel:
    def __init__(self):
        self.context = none
        self.observe_values = []

    async def initialize_context(self):
        """initialize coap context"""
        self.context = await context.create_client_context()

    async def encode_in_observe_sequence(self, data, server_uri):
        """encode data in observe notification sequence numbers"""

        if not self.context:
            await self.initialize_context()

        # start observe relationship
        request = message(code=get, uri=server_uri, observe=0)

        try:
            # this would require custom coap server that manipulates observe values
            response = await self.context.request(request).response
            print(f"started observe relationship: {response.code}")

            # in practice, the server would send notifications with
            # sequence numbers that encode the covert data

        except exception as e:
            print(f"observe setup failed: {e}")

    def decode_observe_sequence(self, observe_values):
        """decode data from observe notification sequence"""

        # extract data from observe option values
        binary_data = ''

        for observe_val in observe_values:
            # use observe value as 8-bit data (0-255)
            if 0 <= observe_val <= 255:
                binary_chunk = format(observe_val, '08b')
                binary_data += binary_chunk

        # convert to text
        decoded_text = ''
        for i in range(0, len(binary_data), 8):
            byte_bits = binary_data[i:i+8]
            if len(byte_bits) == 8:
                decoded_text += chr(int(byte_bits, 2))

        return decoded_text

# complete coap covert channel suite
class coapcovertchannelsuite:
    def __init__(self, server_uri='coap://localhost:5683'):
        self.server_uri = server_uri
        self.channels = {
            'token': coaptokenchannel(),
            'uri_path': coapuripathchannel(),
            'option': coapoptionchannel(),
            'blockwise': coapblockwisechannel(),
            'observe': coapobservechannel()
        }

    async def send_multi_channel_data(self, data, channel_types=['token', 'uri_path']):
        """send data across multiple covert channels simultaneously"""

        results = {}

        for channel_type in channel_types:
            if channel_type in self.channels:
                channel = self.channels[channel_type]

                try:
                    if channel_type == 'token':
                        result = await channel.send_token_covert_data(data, self.server_uri + '/test')
                    elif channel_type == 'uri_path':
                        result = await channel.send_uri_covert_data(data, self.server_uri)
                    elif channel_type == 'option':
                        result = await channel.send_option_covert_data(data, self.server_uri + '/test')
                    elif channel_type == 'blockwise':
                        result = await channel.send_blockwise_covert_data(data, self.server_uri + '/upload')
                    elif channel_type == 'observe':
                        result = await channel.encode_in_observe_sequence(data, self.server_uri + '/events')

                    results[channel_type] = result

                except exception as e:
                    print(f"{channel_type} channel failed: {e}")
                    results[channel_type] = none

        return results

    async def analyze_channel_performance(self, data, iterations=5):
        """analyze performance characteristics of coap covert channels"""

        performance = {}

        for channel_name, channel in self.channels.items():
            start_time = time.time()
            success_count = 0

            for i in range(iterations):
                try:
                    if channel_name == 'token':
                        await channel.send_token_covert_data(f"{data}_{i}", self.server_uri + '/test')
                    # ... other channel types

                    success_count += 1

                except exception as e:
                    print(f"{channel_name} iteration {i} failed: {e}")

            end_time = time.time()

            performance[channel_name] = {
                'success_rate': success_count / iterations,
                'avg_time_per_message': (end_time - start_time) / iterations,
                'total_time': end_time - start_time
            }

        return performance

# usage
async def run_comprehensive_coap_test():
    """run comprehensive coap covert channel test"""

    suite = coapcovertchannelsuite()

    # test individual channels
    test_data = "covert message"
    results = await suite.send_multi_channel_data(test_data, ['token', 'uri_path'])

    print("coap covert channel results:")
    for channel, result in results.items():
        status = "success" if result else "failed"
        print(f"  {channel}: {status}")

    # performance analysis
    performance = await suite.analyze_channel_performance("test", iterations=3)

    print("\nperformance analysis:")
    for channel, stats in performance.items():
        print(f"  {channel}:")
        print(f"    success rate: {stats['success_rate']:.1%}")
        print(f"    avg time: {stats['avg_time_per_message']:.2f}s")

# asyncio.run(run_comprehensive_coap_test())

traffic analysis and detection

coap packet analysis

# coap covert channel detection
import struct
import statistics
from collections import defaultdict, counter

class coapcovertdetector:
    def __init__(self):
        self.client_stats = defaultdict(lambda: {
            'message_ids': [],
            'tokens': [],
            'uri_paths': [],
            'options': [],
            'request_count': 0,
            'first_seen': none,
            'last_seen': none
        })

        # detection thresholds
        self.thresholds = {
            'high_entropy': 6.0,
            'unusual_message_id_pattern': 0.8,  # correlation threshold
            'max_token_entropy': 5.0,
            'suspicious_uri_length': 50,
            'unusual_option_codes': [65000, 65001, 65002]  # experimental range
        }

    def analyze_coap_packet(self, timestamp, src_ip, dst_ip, coap_data):
        """analyze coap packet for covert channel indicators"""

        if len(coap_data) < 4:
            return  # invalid coap packet

        # parse coap header
        header = struct.unpack('!bbh', coap_data[:4])
        version_type_tkl = header[0]
        code = header[1]
        message_id = header[2]

        version = (version_type_tkl >> 6) & 0x03
        msg_type = (version_type_tkl >> 4) & 0x03
        token_length = version_type_tkl & 0x0f

        if version != 1:  # only coap version 1
            return

        # extract token
        token = b''
        if token_length > 0 and len(coap_data) > 4:
            token = coap_data[4:4+token_length]

        # update client statistics
        client_key = (src_ip, dst_ip)
        stats = self.client_stats[client_key]

        stats['message_ids'].append(message_id)
        stats['tokens'].append(token)
        stats['request_count'] += 1

        if stats['first_seen'] is none:
            stats['first_seen'] = timestamp
        stats['last_seen'] = timestamp

        # parse options and uri path (simplified)
        options_start = 4 + token_length
        if len(coap_data) > options_start:
            self.parse_coap_options(coap_data[options_start:], stats)

        # check for anomalies
        self.check_coap_anomalies(client_key)

    def parse_coap_options(self, options_data, stats):
        """parse coap options for analysis"""

        i = 0
        option_number = 0

        while i < len(options_data):
            if options_data[i] == 0xff:  # payload marker
                break

            if i >= len(options_data):
                break

            # parse option delta and length
            option_byte = options_data[i]
            option_delta = (option_byte >> 4) & 0x0f
            option_length = option_byte & 0x0f

            i += 1

            # handle extended delta/length (simplified)
            if option_delta == 13:
                if i < len(options_data):
                    option_delta = 13 + options_data[i]
                    i += 1
            elif option_delta == 14:
                if i + 1 < len(options_data):
                    option_delta = 269 + struct.unpack('!h', options_data[i:i+2])[0]
                    i += 2

            if option_length == 13:
                if i < len(options_data):
                    option_length = 13 + options_data[i]
                    i += 1
            elif option_length == 14:
                if i + 1 < len(options_data):
                    option_length = 269 + struct.unpack('!h', options_data[i:i+2])[0]
                    i += 2

            option_number += option_delta

            # extract option value
            if option_length > 0 and i + option_length <= len(options_data):
                option_value = options_data[i:i + option_length]
                i += option_length

                stats['options'].append((option_number, option_value))

                # check for uri-path (option 11)
                if option_number == 11:
                    try:
                        uri_segment = option_value.decode('utf-8')
                        stats['uri_paths'].append(uri_segment)
                    except:
                        pass
            else:
                break

    def check_coap_anomalies(self, client_key):
        """check for coap covert channel anomalies"""

        stats = self.client_stats[client_key]

        if stats['request_count'] < 5:  # need minimum samples
            return

        anomalies = []

        # message id analysis
        if len(stats['message_ids']) >= 10:
            msg_id_anomalies = self.analyze_message_id_patterns(stats['message_ids'])
            anomalies.extend(msg_id_anomalies)

        # token analysis
        if len(stats['tokens']) >= 5:
            token_anomalies = self.analyze_token_patterns(stats['tokens'])
            anomalies.extend(token_anomalies)

        # uri path analysis
        if stats['uri_paths']:
            uri_anomalies = self.analyze_uri_patterns(stats['uri_paths'])
            anomalies.extend(uri_anomalies)

        # option analysis
        if stats['options']:
            option_anomalies = self.analyze_option_patterns(stats['options'])
            anomalies.extend(option_anomalies)

        if anomalies:
            src_ip, dst_ip = client_key
            print(f"coap anomalies detected: {src_ip} -> {dst_ip}")
            for anomaly in anomalies:
                print(f"  {anomaly}")

    def analyze_message_id_patterns(self, message_ids):
        """analyze message id patterns for covert channels"""

        anomalies = []

        # check for sequential patterns
        if len(message_ids) >= 5:
            differences = [message_ids[i] - message_ids[i-1] for i in range(1, len(message_ids))]

            # consistent differences suggest encoding
            if len(set(differences)) <= 2 and 0 not in set(differences):
                anomalies.append(f"sequential message id pattern detected")

        # entropy analysis
        if len(message_ids) >= 10:
            entropy = self.calculate_entropy(message_ids)
            if entropy > 14:  # high entropy for 16-bit values
                anomalies.append(f"high message id entropy: {entropy:.2f}")

        # check for repeated patterns
        id_counts = counter(message_ids)
        most_common = id_counts.most_common(1)[0]
        if most_common[1] > len(message_ids) * 0.3:  # >30% repetition
            anomalies.append(f"repeated message id: {most_common[0]:04x}")

        return anomalies

    def analyze_token_patterns(self, tokens):
        """analyze token patterns for covert channels"""

        anomalies = []

        # check token lengths
        token_lengths = [len(token) for token in tokens]

        if statistics.stdev(token_lengths) if len(token_lengths) > 1 else 0 > 2:
            anomalies.append("highly variable token lengths")

        # check token entropy
        for i, token in enumerate(tokens[-5:]):  # check recent tokens
            if len(token) > 0:
                entropy = self.calculate_entropy(token)
                if entropy > self.thresholds['max_token_entropy']:
                    anomalies.append(f"high entropy token: {token.hex()}")

        # check for base64-like patterns
        for token in tokens[-5:]:
            if len(token) > 4:
                try:
                    decoded = token.decode('ascii')
                    if self.looks_like_base64(decoded):
                        anomalies.append(f"base64-like token: {decoded}")
                except:
                    pass

        return anomalies

    def analyze_uri_patterns(self, uri_paths):
        """analyze uri path patterns for covert channels"""

        anomalies = []

        # check for unusually long uri segments
        for uri_segment in uri_paths:
            if len(uri_segment) > self.thresholds['suspicious_uri_length']:
                anomalies.append(f"long uri segment: {uri_segment[:20]}...")

            # check for base64 encoding in uri
            if self.looks_like_base64(uri_segment):
                anomalies.append(f"base64-like uri segment: {uri_segment}")

            # check for high entropy
            entropy = self.calculate_entropy(uri_segment.encode())
            if entropy > self.thresholds['high_entropy']:
                anomalies.append(f"high entropy uri segment: {uri_segment}")

        return anomalies

    def analyze_option_patterns(self, options):
        """analyze coap options for covert channels"""

        anomalies = []

        # check for unusual option numbers
        option_numbers = [opt[0] for opt in options]

        for option_num in option_numbers:
            if option_num in self.thresholds['unusual_option_codes']:
                anomalies.append(f"experimental option code: {option_num}")

            # very high option numbers are suspicious
            if option_num > 60000:
                anomalies.append(f"high option number: {option_num}")

        # check option value entropy
        for option_num, option_value in options[-10:]:  # recent options
            if len(option_value) > 4:
                entropy = self.calculate_entropy(option_value)
                if entropy > self.thresholds['high_entropy']:
                    anomalies.append(f"high entropy option {option_num}: {option_value.hex()}")

        return anomalies

    def calculate_entropy(self, data):
        """calculate shannon entropy"""
        from collections import counter
        import math

        if not data:
            return 0

        if isinstance(data[0], int):  # list of integers
            counts = counter(data)
        else:  # bytes
            counts = counter(data)

        total = len(data)
        probs = [count/total for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)

    def looks_like_base64(self, text):
        """heuristic to detect base64 encoding"""
        import re

        if len(text) < 4:
            return false

        # base64 character pattern
        base64_pattern = re.compile(r'^[a-za-z0-9+/_-]+=*$')

        if not base64_pattern.match(text):
            return false

        # length should be multiple of 4
        if len(text) % 4 not in [0, 2, 3]:  # account for padding
            return false

        # high entropy suggests encoding
        entropy = self.calculate_entropy(text.encode())
        return entropy > 4.0

# usage
detector = coapcovertdetector()

# simulate coap packet analysis
coap_packet = b'\x44\x01\x12\x34token123/api/data'  # example coap packet
detector.analyze_coap_packet(time.time(), '192.168.1.100', '192.168.1.1', coap_packet)

countermeasures

coap server hardening

# coap server with covert channel prevention
from aiocoap import *
import hashlib
import time

class securecoapserver:
    def __init__(self, port=5683):
        self.port = port

        # security policies
        self.security_policy = {
            'max_token_length': 4,  # reduce from 8
            'allowed_message_types': [0, 1],  # con, non
            'max_uri_path_length': 20,
            'max_payload_size': 64,
            'allowed_option_codes': [3, 7, 8, 11, 12, 14, 15, 17, 20, 35, 39, 60],  # standard options only
            'rate_limit_per_ip': 10,  # requests per minute
            'max_block_size': 64  # limit block transfers
        }

        # rate limiting
        self.client_requests = defaultdict(list)

    async def start_server(self):
        """start secure coap server"""

        root = resource.site()
        root.add_resource(['secure'], secureresource(self))
        root.add_resource(['test'], secureresource(self))

        await context.create_server_context(root, bind=('localhost', self.port))
        print(f"secure coap server started on port {self.port}")

        await asyncio.get_event_loop().create_future()

    def validate_request(self, request, client_ip):
        """validate coap request against security policy"""

        violations = []

        # rate limiting
        current_time = time.time()
        client_history = self.client_requests[client_ip]

        # remove old requests (older than 1 minute)
        client_history[:] = [req_time for req_time in client_history
                           if current_time - req_time < 60]

        if len(client_history) >= self.security_policy['rate_limit_per_ip']:
            violations.append("rate limit exceeded")

        client_history.append(current_time)

        # token validation
        if hasattr(request, 'token') and len(request.token) > self.security_policy['max_token_length']:
            violations.append(f"token too long: {len(request.token)} bytes")

        # uri path validation
        if hasattr(request, 'opt') and request.opt.uri_path:
            full_path = '/'.join(request.opt.uri_path)

            if len(full_path) > self.security_policy['max_uri_path_length']:
                violations.append(f"uri path too long: {len(full_path)}")

            # check for suspicious patterns
            if self.contains_suspicious_patterns(full_path):
                violations.append("suspicious uri path pattern")

        # payload size validation
        if hasattr(request, 'payload') and len(request.payload) > self.security_policy['max_payload_size']:
            violations.append(f"payload too large: {len(request.payload)} bytes")

        # option validation (if accessible)
        # note: this would require low-level coap library access

        return violations

    def contains_suspicious_patterns(self, text):
        """check for suspicious patterns in text"""

        # high entropy check
        entropy = self.calculate_entropy(text.encode())
        if entropy > 4.5:
            return true

        # base64 pattern check
        import re
        base64_pattern = re.compile(r'[a-za-z0-9+/]{20,}={0,2}')
        if base64_pattern.search(text):
            return true

        # excessive length for single segment
        segments = text.split('/')
        if any(len(segment) > 15 for segment in segments):
            return true

        return false

    def calculate_entropy(self, data):
        """calculate shannon entropy"""
        from collections import counter
        import math

        if not data:
            return 0

        counts = counter(data)
        probs = [count/len(data) for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)

    def normalize_response(self, response):
        """normalize response to prevent information leakage"""

        # use consistent message ids (hash-based)
        if hasattr(response, 'mid'):
            # generate deterministic message id
            hash_input = f"{time.time()//60}".encode()  # change every minute
            response.mid = int(hashlib.md5(hash_input).hexdigest()[:4], 16)

        # limit response payload
        if hasattr(response, 'payload') and len(response.payload) > 64:
            response.payload = response.payload[:64]

        return response

class secureresource(resource.resource):
    def __init__(self, server):
        super().__init__()
        self.server = server

    async def render_get(self, request):
        """handle get requests with security validation"""

        # extract client ip (simplified)
        client_ip = '127.0.0.1'  # would extract from request in practice

        # validate request
        violations = self.server.validate_request(request, client_ip)

        if violations:
            print(f"security violations from {client_ip}: {violations}")
            return message(code=bad_request, payload=b'request blocked')

        # process legitimate request
        response = message(code=content, payload=b'secure coap response')

        # normalize response
        return self.server.normalize_response(response)

    async def render_post(self, request):
        """handle post requests with security validation"""

        client_ip = '127.0.0.1'
        violations = self.server.validate_request(request, client_ip)

        if violations:
            print(f"security violations from {client_ip}: {violations}")
            return message(code=bad_request, payload=b'request blocked')

        return message(code=created, payload=b'resource created')

# coap traffic normalization proxy
class coapnormalizationproxy:
    def __init__(self, upstream_server, bind_port=5684):
        self.upstream_server = upstream_server
        self.bind_port = bind_port
        self.context = none

    async def start_proxy(self):
        """start coap normalization proxy"""

        self.context = await context.create_client_context()

        root = resource.site()
        root.add_resource(['.well-known', 'core'], proxyresource(self))
        root.add_resource(['proxy'], proxyresource(self))

        await context.create_server_context(root, bind=('localhost', self.bind_port))
        print(f"coap normalization proxy on port {self.bind_port}")

        await asyncio.get_event_loop().create_future()

    async def normalize_and_forward(self, request):
        """normalize request and forward to upstream"""

        # create normalized request
        normalized_request = self.normalize_coap_request(request)

        # forward to upstream server
        try:
            response = await self.context.request(normalized_request).response

            # normalize response
            normalized_response = self.normalize_coap_response(response)
            return normalized_response

        except exception as e:
            print(f"upstream request failed: {e}")
            return message(code=bad_gateway, payload=b'upstream error')

    def normalize_coap_request(self, request):
        """normalize coap request to remove covert channels"""

        # create clean request
        normalized = message(
            code=request.code,
            uri=f"coap://{self.upstream_server}/normalized"
        )

        # sanitize payload
        if hasattr(request, 'payload') and request.payload:
            # limit payload size
            clean_payload = request.payload[:64]

            # remove potential covert markers
            clean_payload = clean_payload.replace(b'\xaa\xbb', b'')
            clean_payload = clean_payload.replace(b'\xff\xfe', b'')

            normalized.payload = clean_payload

        # use standardized token
        normalized.token = b'norm'

        return normalized

    def normalize_coap_response(self, response):
        """normalize coap response"""

        # create clean response
        normalized = message(
            code=response.code,
            payload=response.payload[:64] if response.payload else b''
        )

        # use predictable message id
        normalized.mid = int(time.time()) & 0xffff

        return normalized

class proxyresource(resource.resource):
    def __init__(self, proxy):
        super().__init__()
        self.proxy = proxy

    async def render_get(self, request):
        """proxy get requests"""
        return await self.proxy.normalize_and_forward(request)

    async def render_post(self, request):
        """proxy post requests"""
        return await self.proxy.normalize_and_forward(request)

# usage
async def run_secure_coap_infrastructure():
    """run secure coap server and proxy"""

    # start secure server
    server = securecoapserver(5683)
    server_task = asyncio.create_task(server.start_server())

    # start normalization proxy
    proxy = coapnormalizationproxy('localhost:5683', 5684)
    proxy_task = asyncio.create_task(proxy.start_proxy())

    # run both
    await asyncio.gather(server_task, proxy_task)

# asyncio.run(run_secure_coap_infrastructure())

advantages and limitations

advantages

  • iot environment integration: blends with legitimate iot traffic
  • udp-based protocol: connectionless communication reduces detection
  • multiple encoding vectors: tokens, options, uris, block transfers
  • constrained device compatibility: works with resource-limited devices
  • standard protocol compliance: maintains coap protocol correctness

limitations

  • limited deployment: coap not as widely deployed as http
  • udp unreliability: packet loss affects covert communication
  • small header fields: limited capacity per packet
  • protocol inspection: coap parsers can detect anomalies
  • iot security focus: increasing attention to iot protocol security

performance characteristics

capacity analysis

techniquecapacity per packetreliabilitystealth level
token field8 byteshighhigh
message id2 byteshighmedium
uri path50+ byteshighlow
options20+ bytesmediumhigh
payload1024+ byteshighlow

coap vs other protocols

# coap covert channel performance comparison
def compare_covert_protocols():
    """compare coap with other iot protocols for covert channels"""

    protocols = {
        'coap': {
            'transport': 'udp',
            'header_size': 4,
            'max_payload': 1024,
            'covert_fields': ['token', 'message_id', 'options', 'uri'],
            'covert_capacity_per_packet': 40,  # conservative estimate
            'detection_difficulty': 'medium'
        },

        'mqtt': {
            'transport': 'tcp',
            'header_size': 2,
            'max_payload': 256000000,  # 256mb theoretical
            'covert_fields': ['client_id', 'topic', 'payload'],
            'covert_capacity_per_packet': 100,
            'detection_difficulty': 'low'
        },

        'http': {
            'transport': 'tcp',
            'header_size': 'variable',
            'max_payload': 'unlimited',
            'covert_fields': ['headers', 'uri', 'payload', 'cookies'],
            'covert_capacity_per_packet': 1000,
            'detection_difficulty': 'low'
        },

        'dns': {
            'transport': 'udp',
            'header_size': 12,
            'max_payload': 512,  # standard
            'covert_fields': ['queries', 'responses', 'subdomains'],
            'covert_capacity_per_packet': 200,
            'detection_difficulty': 'medium'
        }
    }

    print("iot covert channel protocol comparison:")
    for protocol, specs in protocols.items():
        print(f"\n{protocol.upper()}:")
        print(f"  transport: {specs['transport']}")
        print(f"  covert capacity: ~{specs['covert_capacity_per_packet']} bytes/packet")
        print(f"  detection difficulty: {specs['detection_difficulty']}")
        print(f"  covert fields: {', '.join(specs['covert_fields'])}")

# compare_covert_protocols()

real-world applications

iot device compromise

  • command and control for compromised iot devices
  • data exfiltration from smart home systems
  • coordination between distributed sensors

industrial espionage

  • monitoring industrial iot deployments
  • extracting sensor data from smart factories
  • maintaining access in operational technology networks

research and testing

  • iot security assessment frameworks
  • covert channel capacity studies
  • protocol implementation testing

references

  • rfc 7252: the constrained application protocol (coap)
  • rfc 7641: observing resources in the constrained application protocol
  • rfc 7959: block-wise transfers in the constrained application protocol
  • “security considerations for coap” - ietf core working group
  • cceap educational framework - network covert channels research
on this page