Something
Emporium

Archived News for July 2013

Using a Redis session provider with Ratchet and Symfony

As I've posted before, I play around a fair bit with WebSockets. Although I maintain Wrench, I've been using Ratchet for my latest project. I'm displaying on \ to really make the most of the low latency.

One thing I've been playing with is Ratchet's Session Provider support. The intention is to get your normal session data available for your Websocket application. I've had to work around a few frustrating limitations; they're no fault of the brilliant libraries I've been using though.

One limitation is that the session isn't writeable. They say "please do not try and write to the session" right in the documentation for the SessionProvider. This seems to be because the read-only view of the session they're providing is deserialized by code in Ratchet itself. If they get the (de)serialization wrong, the session may no longer be readable by your main app -- yuck. The only safe way to provide writeable sessions would be to require the whole Symfony Session component. And Ratchet has a much lighter set of dependencies than that, which is nice.

But, that also means that if you are using Symfony2 for your main app, you can write to sessions from your websocket application; you just have to use the Symfony way of accessing a Session instance, rather than the Ratchet way. Here's how I did it. First, I defined a SessionFactory:

namespace Application\WebsocketBundle\Services;

use Symfony\Component\HttpFoundation\Session\Attribute\AttributeBagInterface;
use Symfony\Component\HttpFoundation\Session\Flash\FlashBagInterface;
use Symfony\Component\HttpFoundation\Session\Session;
use Symfony\Component\HttpFoundation\Session\Storage\SessionStorageInterface;

class SessionFactory
{
    protected $storage;
    protected $attributes;
    protected $flashes;

    public function __construct(SessionStorageInterface $storage, AttributeBagInterface $attributes = null, FlashBagInterface $flashes = null)
    {
        $this->storage = $storage;
        $this->attributes = $attributes;
        $this->flashes = $flashes;
    }

    /**
     * @param string $id
     */
    public function getInstance($id)
    {
        $session = new Session($this->storage, $this->attributes, $this->flashes);
        $session->setId($id);
        return $session;
    }
}

Then, I injected what the factory needed:

<?xml version="1.0" encoding="utf-8"?>
<container xmlns="http://symfony.com/schema/dic/services"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">
    <services>
        <service id="websocket.session_factory" class="Application\WebsocketBundle\Services\SessionFactory">
            <argument type="service" id="session.storage" />
            <argument type="service" id="session.attribute_bag" />
            <argument type="service" id="session.flash_bag" />
        </service>
    </services>
</container>

Finally, in my server Command, I get an instance of the session factory and pass it into my application for use. Here's what that bit looks like:

// in ListenCommand.php
$factory = $container->get('websocket.session_factory')
// ... then injected into the application

// in ChatApplication.php
if ($name != 'anonymous' && ($session_id = $connection->Session->getId())) {
    $session = $this->factory->getInstance($session_id);
    $session->start();
    $session->set('chat.name', $name);
    $session->save();
}

And that works pretty well. Unified sessions between Symfony2 and websockets, that you can write to when you need to.

But the next limitation you run into is that the session provider doesn't do any sort of specialized exception handling for using a networked session provider. Most such providers (for example the RedisSessionHandler from SncRedisBundle) won't do any either.

When a network connection isn't used for a period of time, it'll time out (usually called a "send" or "write" timeout). This, by itself, isn't noticed by program execution until the next time something tries to use the connection. In effect, on the next connection, the SessionProvider leaks an exception from its $conn->Session->start() call in onOpen(). That's what we have to handle ourselves.

RedisSessionHandler has a nice, permissive protected-API, so it's ripe for subclassing. Here's what I do:

namespace Application\WebsocketBundle\Session\Storage\Handler;

use Predis\CommunicationException;
use Snc\RedisBundle\Session\Storage\Handler\RedisSessionHandler as sncRedisSessionHandler;

class RedisSessionHandler extends sncRedisSessionHandler
{
    const CHECK_EVERY = 20;

    /**
     * @var int timestamp
     */
    protected $checked = null;

    public function read($sessionId)
    {
        $this->checkConnection();
        parent::read($sessionId);
    }

    public function write($sessionId, $data)
    {
        $this->checkConnection();
        parent::write($sessionId, $data);
    }

    public function destroy($sessionId)
    {
        $this->checkConnection();
        parent::destroy($sessionId);
    }

    /**
     * Simple caching logic, so we don't actually ping every call to read/write/destroy
     */
    protected function checkConnection()
    {
        $now = time();

        if (!$this->checked || $this->checked < $now - self::CHECK_EVERY) {
            $this->ping();
        }

        // Even if we haven't pinged the connection this time, we know the
        // connection has been written to shortly after we return from
        // here, so we can bump the timestamp anyway
        $this->checked = $now;
    }

    protected function ping()
    {
        try {
            if (!$this->redis->isConnected()) {
                $this->redis->connect();
            }
            $this->redis->ping();
        } catch (CommunicationException $e) {
            // If just a send timeout, the ping will provoke this handler
            $this->redis->connect();
            // and reconnection will succeed
        }
    }
}

Newest Posts