Skip to content

Advanced Patterns

Advanced usage patterns and best practices.

Custom Reconnection Logic

from mezon.socket import Socket

socket = Socket(host="gw.mezon.ai", port="443", use_ssl=True)

async def on_heartbeat_timeout():
    print("Connection lost - reconnecting...")
    # Custom reconnection logic

socket.onheartbeattimeout = on_heartbeat_timeout

Command Framework

Build a simple command framework:

import json
from dataclasses import dataclass
from typing import Callable, Dict

@dataclass
class CommandContext:
    message: any
    client: any
    channel: any
    args: list

class CommandHandler:
    def __init__(self, client):
        self.client = client
        self.commands: Dict[str, Callable] = {}
        self.prefix = "!"

    def command(self, name: str):
        def decorator(func):
            self.commands[name] = func
            return func
        return decorator

    async def handle(self, message):
        if message.sender_id == self.client.client_id:
            return

        content = json.loads(message.content)
        text = content.get("t", "").strip()

        if not text.startswith(self.prefix):
            return

        parts = text[len(self.prefix):].split()
        if not parts:
            return

        cmd_name = parts[0].lower()
        args = parts[1:]

        if cmd_name in self.commands:
            channel = await self.client.channels.fetch(message.channel_id)
            ctx = CommandContext(
                message=message,
                client=self.client,
                channel=channel,
                args=args,
            )
            await self.commands[cmd_name](ctx)

# Usage
handler = CommandHandler(client)

@handler.command("ping")
async def ping(ctx: CommandContext):
    await ctx.channel.send(content=ChannelMessageContent(t="Pong!"))

@handler.command("echo")
async def echo(ctx: CommandContext):
    text = " ".join(ctx.args) or "Nothing to echo"
    await ctx.channel.send(content=ChannelMessageContent(t=text))

client.on_channel_message(handler.handle)

Conversation State

Track conversation state per user:

from typing import Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class UserState:
    step: str = "start"
    data: Dict = field(default_factory=dict)
    expires: Optional[datetime] = None

class ConversationManager:
    def __init__(self):
        self.states: Dict[str, UserState] = {}

    def get(self, user_id: str) -> UserState:
        return self.states.get(user_id, UserState())

    def set(self, user_id: str, state: UserState):
        self.states[user_id] = state

    def clear(self, user_id: str):
        self.states.pop(user_id, None)

# Usage
conversations = ConversationManager()

async def handle_message(message):
    if message.sender_id == client.client_id:
        return

    user_id = message.sender_id
    state = conversations.get(user_id)
    content = json.loads(message.content)
    text = content.get("t", "")
    channel = await client.channels.fetch(message.channel_id)

    if text == "!survey":
        # Start survey
        state.step = "name"
        conversations.set(user_id, state)
        await channel.send(content=ChannelMessageContent(t="What's your name?"))

    elif state.step == "name":
        state.data["name"] = text
        state.step = "age"
        conversations.set(user_id, state)
        await channel.send(content=ChannelMessageContent(t="How old are you?"))

    elif state.step == "age":
        state.data["age"] = text
        conversations.clear(user_id)
        name = state.data["name"]
        age = state.data["age"]
        await channel.send(
            content=ChannelMessageContent(t=f"Thanks {name}! You're {age} years old.")
        )

client.on_channel_message(handle_message)

Scheduled Tasks

Run periodic tasks:

import asyncio
from datetime import datetime

async def daily_announcement():
    while True:
        now = datetime.now()
        # Run at 9 AM
        if now.hour == 9 and now.minute == 0:
            channel = await client.channels.fetch("announcement_channel_id")
            await channel.send(
                content=ChannelMessageContent(t="Good morning everyone!")
            )
        await asyncio.sleep(60)  # Check every minute

async def main():
    await client.login()

    # Start scheduled task
    asyncio.create_task(daily_announcement())

    await asyncio.Event().wait()

Error Handling Middleware

import traceback
from functools import wraps

def error_handler(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            print(f"Error in {func.__name__}: {e}")
            traceback.print_exc()
            # Optionally notify admin
    return wrapper

@error_handler
async def handle_message(message):
    # Your logic here
    pass

client.on_channel_message(handle_message)

Caching API Responses

from functools import lru_cache
from datetime import datetime, timedelta

class TTLCache:
    def __init__(self, ttl_seconds: int = 300):
        self.cache = {}
        self.ttl = ttl_seconds

    def get(self, key: str):
        if key in self.cache:
            value, expires = self.cache[key]
            if datetime.now() < expires:
                return value
            del self.cache[key]
        return None

    def set(self, key: str, value):
        expires = datetime.now() + timedelta(seconds=self.ttl)
        self.cache[key] = (value, expires)

# Usage
role_cache = TTLCache(ttl_seconds=300)

async def get_user_roles(clan_id: str):
    cached = role_cache.get(clan_id)
    if cached:
        return cached

    clan = await client.clans.get(clan_id)
    roles = await clan.list_roles()
    role_cache.set(clan_id, roles)
    return roles

Plugin System

from abc import ABC, abstractmethod
from typing import List

class Plugin(ABC):
    @abstractmethod
    async def on_message(self, client, message):
        pass

    @abstractmethod
    def get_commands(self) -> List[str]:
        pass

class GreetingPlugin(Plugin):
    async def on_message(self, client, message):
        content = json.loads(message.content)
        text = content.get("t", "")
        if text == "!hi":
            channel = await client.channels.fetch(message.channel_id)
            await channel.send(content=ChannelMessageContent(t="Hello!"))

    def get_commands(self) -> List[str]:
        return ["!hi"]

class PluginManager:
    def __init__(self, client):
        self.client = client
        self.plugins: List[Plugin] = []

    def register(self, plugin: Plugin):
        self.plugins.append(plugin)

    async def handle_message(self, message):
        for plugin in self.plugins:
            await plugin.on_message(self.client, message)

# Usage
plugins = PluginManager(client)
plugins.register(GreetingPlugin())
client.on_channel_message(plugins.handle_message)