12 October 2014

A few months ago I have started learning Erlang, mostly for fun but it was right about time to jump on the functional bandwagon. The best way to learn a new language is to find an engaging project, in my case its been something what has been on my mind for quite a while: a cloud communication protocol / framework for distributed computing. Some basic principles of what it is about can be found here: CloudDDS.

While learning Erlang, I decided to also learn a number of other technologies, or using other words - apply them in practice. One of those is Apache Thrift. Apache Thrift is a binary cross-language framework for distributed service development. It is a serialization protocol with its own RPC stack.

Thrift RPC stack comes with a number of supported protocols, referred to as transports. Erlang implementation provides HTTP, JSON, file, disk log and framed transports. The project I was working on uses UDP for all communication. Below, I am going to describe how I implemented Thrift over UDP in Erlang. I will walk the reader through building a simple active-active setup of two clients communicating via UDP using Thrift messages.

Prerequisites

Apache Thrift uses .thrift file to describe services and types to used by the software. To generate the language code, Apache Thrift must be installed first. There is a lot of examples available how this can be achieved. I have created a bash script for Ubuntu 12.04 for my own reference, available here: Install Thrift 0.9.1 with all language support.

Design

I will follow the standard OTP principles while developing the example.

application -> supervisor -> UDP messaging worker
                        | -> serialization worker

While discussing the implementation, I will not focus on the details of the messaging code, neither I will dive into the details of the standard OTP components. I will focus only on the Thrift part.

Basic OTP modules

First step is to create a basic Erlang application. All the code can be found below. Additionally, at the bottom of the article I am publishing the link to the sources available on github.

rebar.config

A bare minimum required for installing dependencies.

{deps, [
  {thrift, ".*",
    {git, "https://github.com/radekg/thrift-erlang.git", "master"}}
]}.

I am using my own fork of Thrift, it contains updated JSX dependencies.

src/udp_thrift.app.src

{application, udp_thrift,
  [{description, "Example of using Thrift with UDP"},
  {vsn, "1"},
  {modules, [udp_thrift_types,
             udp_thrift_app,
             udp_thrift_common,
             udp_thrift_sup,
             udp_thrift_messaging,
             udp_thrift_serialization]},
  {registered, udp_thrift_sup},
  {applications, [kernel, stdlib, crypto, thrift]},
  {mod, {udp_thrift_app, []}}]}.

src/udp_thrift_app.erl

-module(udp_thrift_app).
-behaviour(application).
-export([start/2, stop/1]).

start(_Type, _Args) ->
  udp_thrift_sup:start_link().
stop(_State) ->
  ok.

src/udp_thrift_common.erl

-module(udp_thrift_common).
-export([
  get_timestamp/0,
  get_message_id_as_string/0 ]).

get_timestamp() ->
  {Mega,Sec,Micro} = os:timestamp(),
  trunc( ((Mega*1000000+Sec)*1000000+Micro) / 1000000 ).

get_message_id_as_string() ->
  lists:flatten( io_lib:format( "~p", [ make_ref() ] ) ).

src/udp_thrift_sup.erl

-module(udp_thrift_sup).
-behaviour(supervisor).
-export([start_link/0, init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
  Port = case application:get_env(udp_thrift, port) of
    { ok, Value } ->
      Value;
    undefined ->
      { error, no_port }
  end,
  PeerPort = case application:get_env(udp_thrift, peer_port) of
    { ok, Value2 } ->
      Value2;
    undefined ->
      { error, no_peer_port }
  end,
  case application:get_env(udp_thrift, name) of
    { ok, Name } ->
        {ok, { { one_for_all, 10, 10},
          [
            {
              udp_thrift_messaging,
              {udp_thrift_messaging, start_link, [ {127,0,0,1}, Port, Name, PeerPort ]},
              permanent,
              brutal_kill,
              worker,
              []
            }, {
              udp_thrift_serialization,
              {udp_thrift_serialization, start_link, []},
              permanent,
              brutal_kill,
              worker,
              []
            }
          ]
        }
      };
    undefined ->
      { error, no_name_given }
  end.

The supervisor expects a number of environment properties to be set. This will be provided later on, in Running and testing section. If one or more of these is not provided, the application will not start or fail at a worker start step.

src/udp_thrift_messaging.erl

-module(udp_thrift_messaging).
-behaviour(gen_server).
-export([start_link/4, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).

-define(SERVER, ?MODULE).

-include("udp_thrift_types.hrl").

start_link(BindAddress, BindPort, Name, PeerPort) ->
  gen_server:start_link({local, ?MODULE}, ?MODULE,
                         [BindAddress, BindPort, Name, PeerPort], []).

stop() -> gen_server:cast(?MODULE, stop).

init([BindAddress, BindPort, Name, PeerPort]) ->
  case gen_udp:open(BindPort, [binary, {ip, BindAddress}]) of
    {ok, Socket} ->
      erlang:send_after(2000, self(), { contact_peer, BindPort }),
      {ok, {messaging, Socket, Name, PeerPort}};
    {error, Reason} ->
      {error, Reason}
  end.

terminate(_Reason, {messaging, Socket, _}) ->
  gen_udp:close(Socket).

handle_cast(stop, LoopData) ->
  {noreply, LoopData}.

%% SENDING

handle_info({ contact_peer, Port }, { messaging, Socket, Name, PeerPort }) ->
  error_logger:info_msg( "Sending digest to ~p.", [ PeerPort ] ),
  Digest = #digest{
    name = Name,
    port = Port,
    heartbeat = udp_thrift_common:get_timestamp(),
    id = udp_thrift_common:get_message_id_as_string() },
  udp_thrift_serialization ! { serialize, digest, Digest, self() },
  erlang:send_after(2000, self(), { contact_peer, Port }),
  {noreply, { messaging, Socket, Name, PeerPort }};

handle_info({ message_serialized, { ok, SerializedMessage } },
             { messaging, Socket, Name, PeerPort }) ->
  gen_udp:send(
    Socket,
    { 127,0,0,1 },
    PeerPort,
    SerializedMessage ),
  {noreply, { messaging, Socket, Name, PeerPort } };

%% RECEIVING

handle_info({udp, _ClientSocket, _ClientIp, _ClientPort, Msg},
             { messaging, Socket, Name, PeerPort }) ->
  udp_thrift_serialization ! { deserialize, Msg, self() },
  {noreply, { messaging, Socket, Name, PeerPort }};

handle_info({ message_deserialized, { ok, DecodedPayloadType, DecodedPayload } },
             { messaging, Socket, Name, PeerPort }) ->
  self() ! { message, DecodedPayloadType, DecodedPayload },
  {noreply, { messaging, Socket, Name, PeerPort }};

handle_info({ message, digest, DecodedPayload },
             { messaging, Socket, Name, PeerPort }) ->
  error_logger:info_msg("Received digest from ~p. Replying to ~p",
                         [ DecodedPayload#digest.name, DecodedPayload#digest.port ]),
  DigestAck = #digestAck{
    name = Name,
    heartbeat = udp_thrift_common:get_timestamp(),
    reply_id = DecodedPayload#digest.id },
  udp_thrift_serialization ! { serialize, digestAck, DigestAck, self() },
  {noreply, { messaging, Socket, Name, PeerPort }};

handle_info({ message, digestAck, DecodedPayload },
             { messaging, Socket, Name, PeerPort }) ->
  error_logger:info_msg("Received digestAck to digest ~p.",
                         [ DecodedPayload#digestAck.reply_id ]),
  {noreply, { messaging, Socket, Name, PeerPort }};

%% ERROR HANDLING

handle_info({ message_deserialized, {error, Reason} },
             { messaging, Socket, Name, PeerPort }) ->
  error_logger:error_msg("Message decode failed. Reason ~p.", [Reason]),
  {noreply, { messaging, Socket, Name, PeerPort }};

handle_info(Msg, LoopData) ->
  error_logger:info_msg("Unhandled handle_info ~p", [Msg]),
  {noreply, LoopData}.

%% REMAINDER OF GEN_SERVER BEHAVIOUR

handle_call({message, _Msg}, _From, LoopData) ->
  {reply, ok, LoopData}.

code_change(_OldVsn, State, _Extra) ->
  {ok, State}.

private/udp_thrift.thrift

The task of this example is for every client, at two seconds interval, send a Digest to the other client. On the other side of the wire, upon receiving the Digest, send a DigestAck message back. Every message is wrapped in a DigestEnvelope container so the program can easily establish what is the digest type sent. The thrift file looks like this:

namespace erl udp_thrift

struct DigestEnvelope {
  1: required string payload_type;
  2: required string bin_payload;
}

struct Digest {
  1: required string name;
  2: required i32 port;
  3: required i64 heartbeat;
  4: required string id;
}

struct DigestAck {
  1: required string name;
  2: required i64 heartbeat;
  3: required string reply_id;
}

Generate code from Thrift and make it available

Once thrift is installed on the system, the code can be generated from the thrift file. To do so:

cd <project root>/private/
thrift --gen erl udp_thrift.thrift
mkdir -p ../include
# make the generated code available for the program:
mv gen-erl/udp_thrift_types.erl ../src/udp_thrift_types.erl
mv gen-erl/udp_thrift_constants.hrl ../include/udp_thrift_constants.hrl
mv gen-erl/udp_thrift_types.hrl ../include/udp_thrift_types.hrl
rm -Rf gen-erl

The source code on github contains a Vagrant box which can be used to generate the code:

vagrant up thrift

Thrift serialisation and deserialisation

The module responsible for these tasks has been already started in the supervisor. Let’s go through the code and discuss the parts in detail.

-module(udp_thrift_serialization).
-behaviour(gen_server).

-export([start_link/0, stop/0]).
-export([init/1,
          handle_call/3,
          handle_cast/2,
          handle_info/2,
          code_change/3,
          terminate/2]).

-include("udp_thrift_types.hrl").

First, these records are required. As the serialisation happens within this single module, I include them directly by copying from Erlang Thrift sources.

-record(binary_protocol, {transport,
                          strict_read=true,
                          strict_write=true
                         }).
-record(memory_buffer, {buffer}).

Following is the code for the module setup. For serialisation, a single protocol instance may be used for all serialized messages.

start_link() ->
  gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

init([]) ->
  {ok, OutThriftTransport} = thrift_memory_buffer:new(),
  {ok, OutThriftProtocol} = thrift_binary_protocol:new(OutThriftTransport),
  {ok, { serialization, OutThriftProtocol }}.

stop() -> gen_server:cast(?MODULE, stop).

Serialization

handle_info({ serialize, DigestType, Digest, CallerPid }, LoopData) ->
  self() ! { serialize, DigestType,
              Digest, udp_thrift_types:struct_info(DigestType),
              CallerPid },
  {noreply, LoopData};

handle_info({ serialize, DigestType, Digest, StructInfo, CallerPid },
             { serialization, OutThriftProtocol }) ->
  CallerPid ! { message_serialized,
                 { ok,
                    digest_to_binary( #digestEnvelope{
                        payload_type = atom_to_list(DigestType),
                        bin_payload = digest_to_binary(Digest, StructInfo, OutThriftProtocol)
                      },
                      udp_thrift_types:struct_info(digestEnvelope),
                      OutThriftProtocol ) } },
  {noreply, { serialization, OutThriftProtocol } };

First handle_info receives the data to serialize, it uses udp_thrift_types:struct_info provided by the thrift generated modules to get the thrift type definition. This is forwarded to the the second handle_info. digest_to_binary is where all the action happens (presented shortly). The inner digest_to_binary call serializes the digest received from messaging component. This is then wrapped inside of the digestEnvelope digest which has to be converted to binary data for UDP delivery.

Deserialization

handle_info({ deserialize, BinaryDigest, CallerPid }, LoopData) ->
  try
    case digest_from_binary(digestEnvelope, BinaryDigest) of
      {ok, DecodedResult} ->
        case payload_type_as_known_atom(DecodedResult#digestEnvelope.payload_type) of
          { ok, PayloadTypeAtom } ->
            case digest_from_binary(
                  PayloadTypeAtom,
                  DecodedResult#digestEnvelope.bin_payload) of
              { ok, DecodedResult2 } ->
                CallerPid ! { message_deserialized, { ok, PayloadTypeAtom, DecodedResult2 } };
              _ ->
                error_logger:error_msg("Message could not be decoded."),
                CallerPid ! { message_deserialized, { error, decode_binary_content_failed } }
            end;
          { error, UnsupportedPayloadType } ->
            error_logger:error_msg("Unsupprted message ~p.", [UnsupportedPayloadType]),
            CallerPid ! { message_deserialized, {error, unsuppoted_payload_type} }
        end;
      _ ->
        error_logger:error_msg("Could not open digestEnvelope."),
        CallerPid ! { message_deserialized, {error, digest_envelope_open_failed} }
    end
  catch
    _Error:Reason ->
      gossiper_log:err("Error while reading digest: ~p.", [Reason]),
      CallerPid ! { message_deserialized, { error, digest_read } }
  end,
  {noreply, LoopData};

handle_info(_Info, LoopData) ->
  {noreply, LoopData}.

Upon received a deserialization request, the first thing that happens is deserialize_from_binary. This function assumes that the data received is Thrift and the data is of type digestEnvelope. The details will be presented shortly. Once the payload carried by the envelope has been read, the program detects if the payload is of a supported type. If so, it attempts deserializing the content. If all of the above succeeds, the result is passed back to messaging (caller of the serialization). Majority of that function is simple error handling.

digest_to_binary, digest_from_binary and payload_type_as_known_atom

  digest_to_binary(Digest, StructInfo, OutThriftProtocol) ->
    {PacketThrift, ok} = thrift_protocol:write(OutThriftProtocol,
                                               { {struct, element(2, StructInfo)}, Digest}),
    {protocol, _, OutProtocol} = PacketThrift,
    {transport, _, OutTransport} = OutProtocol#binary_protocol.transport,
    iolist_to_binary(OutTransport#memory_buffer.buffer).

digest_to_binary serializes the payload. It uses the copy of the protocol defined in init to write the thrift binary representation of data to be sent. All what has to be done afterwards is converting iolist contained in the resulting OutTransport to binary data. This is returned to CallerPid and sent via UDP to the destination.

digest_from_binary(DigestType, BinaryDigest) ->
  {ok, InTransport} = thrift_memory_buffer:new(BinaryDigest),
  {ok, InProtocol} = thrift_binary_protocol:new(InTransport),
  case thrift_protocol:read(
         InProtocol,
         {struct, element(2, udp_thrift_types:struct_info(DigestType))},
         DigestType) of
    {_, {ok, DecodedResult}} ->
      {ok, DecodedResult};
    _ ->
      {error, not_thrift}
  end.

digest_from_binary receives an atom of an expected digest type and a binary digest as an input. First, a memory_buffer transport is constructed from the binary data. It is then converted into a binary protocol and an attempt to read the data from thrift is made using a structure provided by udp_thrift_types:struct_info.

payload_type_as_known_atom(DigestTypeBin) ->
  KnownDigestTypes = [
    { <<"digest">>, digest },
    { <<"digestAck">>, digestAck } ],
  case lists:keyfind( DigestTypeBin, 1, KnownDigestTypes ) of
    false -> { error, DigestTypeBin };
    { _Bin, Atom } -> { ok, Atom }
  end.

The above function is rather self explanatory. The deserialization will be attempted if an atom can be found in the KnownDigestTypes for given digest type. Otherwise an error is returned.

The module ends with the rest of required gen_server behaviour.

handle_call(_Msg, _From, LoopData) ->
  {reply, ok, LoopData}.

handle_cast(stop, LoopData) ->
  {noreply, LoopData}.

terminate(_Reason, _LoopData) ->
  {ok}.

code_change(_OldVsn, State, _Extra) ->
  {ok, State}.

Running and testing

Preparation:

cd <project directory>
./rebar get-deps
./rebar compile

To run, two terminal windows are required. In the first window, start Erlang with this command:

erl -pa ebin/ -pa deps/*/ebin

Run the application:

application:set_env(udp_thrift, name, <<"memberA">>).
application:set_env(udp_thrift, port, 6666).
application:set_env(udp_thrift, peer_port, 6667).
application:start(crypto).
application:start(thrift).
application:start(udp_thrift).

In the second terminal window, start Erlang:

erl -pa ebin/ -pa deps/*/ebin

Run the application:

application:set_env(udp_thrift, name, <<"memberB">>).
application:set_env(udp_thrift, port, 6667).
application:set_env(udp_thrift, peer_port, 6666).
application:start(crypto).
application:start(thrift).
application:start(udp_thrift).

Both windows should be showing output similar to this:

…
Sending digest to 6667.
=INFO REPORT==== 12-Oct-2014::21:43:43 ===
Sending digest to 6667.
=INFO REPORT==== 12-Oct-2014::21:43:43 ===
Received digestAck to digest <<"#Ref<0.0.0.95>">>.
=INFO REPORT==== 12-Oct-2014::21:43:45 ===
Received digest from <<"memberB">>. Replying to 6667
=INFO REPORT==== 12-Oct-2014::21:43:45 ===
Sending digest to 6667.
=INFO REPORT==== 12-Oct-2014::21:43:45 ===
Received digestAck to digest <<"#Ref<0.0.0.100>">>.
=INFO REPORT==== 12-Oct-2014::21:43:47 ===
Received digest from <<"memberB">>. Replying to 6667
=INFO REPORT==== 12-Oct-2014::21:43:47 ===
Sending digest to 6667.
=INFO REPORT==== 12-Oct-2014::21:43:47 ===
Received digestAck to digest <<"#Ref<0.0.0.105>">>.
…

If that is the case, both clients are communicating via UDP with Thrift protocol.

Code and license

All code discussed above can be found on github: Using Apache Thrift with UDP in Erlang. The code is available under MIT license.