読者です 読者をやめる 読者になる 読者になる

後ろを向いて後退します

これって前に進んでいることになりませんか?

不穏ツイートがしやすくなるアプリを作った

Advent Calendar 日記 Erlang

Aizu Advent Calendar (Qiita) #18

qiita.com

明日はYUARAさんです。

謝罪

調子に乗って登録しまくったらトップバッターが大遅刻ですよ。

バカ!!!!!!!!!

本当に申し訳ございません。

Erlang/OTP

今年のAdvent CalendarではErlangの記事を2つ書いてきましたが、いずれもErlangの特徴である並行処理やアクターモデルには触れていない内容の記事でした。

というわけで、Erlangの並行処理事始め的な感じで簡単にアプリを作ってみました。OAuth周り頑張ったりGUIつけたりすればサービスとしてリリースもできると思います。誰かやってください。(他力本願)

このメッセージは自動的に消滅する。

こんなセリフを聞いたことがある人も記事を読んでいる人の中にはいるのではないのでしょうか。

スパイ大作戦という、アメリカのドラマでのセリフです。

スパイ大作戦 - Wikipedia

このアプリを使うと、スパイ大作戦の指令書のように指定秒数後に自動的に消滅するツイートをすることができます。これで不穏ツイートし放題ですね。(消滅するまでの間に魚拓を取られたとかで何かしらの問題に発展した場合でも私は責任を取りません。)

リポジトリは以下です。ビルドツールにはrebar3を使っています。アプリへの接続はwebsocketを通して行うのでそこらへんは自由にやってください。

github.com

リポジトリ名が「ジェームズ・ボンド」になっているのは気にしないでください。

記事を書くまで勘違いしていたなんて口が裂けても言えない。

implementation

なんでこれを作るのにErlangなのかというと、ツイートの投稿→削除までの流れを動的に起動した一つひとつのプロセスに任せることによってsleep関数を挟んでもラグなどが起きずに複数のツイートを並行して自動削除することができるからです。

f:id:mic_psm:20161220232807p:plain

一つのツイートにつき一つのプロセスを起動して削除までを管理させることによって、連続でツイートを投稿してもキッチリ投稿から指定秒数後に自動削除されるようになっています。

process tree structure

f:id:mic_psm:20161220234450p:plain

プロセスツリーと相関図は雑ですけど上みたいな感じです。シンプルでよくあるテンプレート的な構造をしています。

  • ws_handler ... クライアント(=wscat)とアプリをwebsocketでつなぐハンドラ。websocket経由で受け取ったメッセージを引数として、後述のjamesbot_supに対してワーカプロセスを起動するように伝える。
  • jamesbot_sup ... ツイートを送信、削除するためのワーカプロセスを管理するスーパーバイザー。ワーカプロセスはws_handlerから受けとったメッセージ毎に一つずつ起動する(simple_one_for_one)。
  • jamesbot_srv ... アプリのメインとなるロジックを持つワーカプロセス。モジュール内private関数にpost/1delete/1を持ち、失敗時には再送処理を行いながら正しい順序で投稿と削除をする。投稿から削除までのインターバルや、TwitterOAuth認証に必要な情報はconfig.hrlに書くようになっている。
  • twitter.com ... TwitterAPIエンドポイントがあるところ。

ざっくり実装解説していきます。

ws_handler

-module(ws_handler).

-export([init/2]).
-export([websocket_handle/3]).
-export([websocket_info/3]).

init(Req, Opts) ->
  io:format("system: websocket connection established~n"),
  {cowboy_websocket, Req, Opts}.

websocket_handle({text, Msg}, Req, State) ->
  io:format("system: request received~n"),
  jamesbot_sup:start_worker(self(), binary_to_list(Msg)),
  {reply, {text, << "request sent: ", Msg/binary >>}, Req, State};
websocket_handle(_Data, Req, State) ->
  {ok, Req, State}.

websocket_info({text, Msg}, Req, State) ->
  self() ! list_to_binary(Msg),
  {ok, Req, State};
websocket_info(_Info, Req, State) ->
  {ok, Req, State}.

ほぼcowboyのテンプレ通りです。websocket経由でメッセージを受け取るとjamesbot_supに対してワーカを起動するよう伝えます。

websocket_handleはwebsocket経由でメッセージを受け取ったときのコールバック関数、websocket_infoはハンドラのプロセスに対してメッセージパッシングされたとき、ボックスからメッセージを取り出したときのコールバック関数です。ちょっと実装がおかしいのでここはあまり参考にしないほうが良さそう。

jamesbot_sup

-module(jamesbot_sup).

-behaviour(supervisor).

%% API
-export([start_link/0, start_worker/2]).

%% Supervisor callbacks
-export([init/1]).

-define(SERVER, ?MODULE).
-define(RESTART, {simple_one_for_one, 1, 1000}).
%% Args must be a list
-define(CHILD_SPEC(Args), {jamesbot, {jamesbot_srv, start_link, Args}, transient, 2000, worker, [jamesbot_srv]}).

start_link() ->
  supervisor:start_link({local, ?SERVER}, ?MODULE, []).

start_worker(HandlerPid, Text) ->
  io:format("system: worker will be initialized~n"),
  supervisor:start_child(?SERVER, [?CHILD_SPEC([HandlerPid, Text])]).

%% Child :: {Id,StartFunc,Restart,Shutdown,Type,Modules}
init([]) ->
  {ok, { ?RESTART, [?CHILD_SPEC([])]} }.

ワーカプロセスのスペックはマクロ関数で定義してあります。再起動戦略はtransientなので、イレギュラーなEXITに対しても自動的にプロセスが再起動されるようになっています。start_worker/2が前述のハンドラから叩くための関数になっています。

jamesbot_srv:post/1, delete/1

post(Text) ->
  ssl:start(),
  ApiKey = {?CK, ?CS, hmac_sha1},
  URL = "https://api.twitter.com/1.1/statuses/update.json",
  Response = oauth:post(URL, [{"status", Text}], ApiKey, ?AT, ?AS).

delete(TweetIdStr) ->
  ssl:start(),
  ApiKey = {?CK, ?CS, hmac_sha1},
  URL = "https://api.twitter.com/1.1/statuses/destroy/" ++ TweetIdStr ++ ".json",
  Response = oauth:post(URL, [{"id", TweetIdStr}], ApiKey, ?AT, ?AS).

find_id(ResponseBody) ->
  BodyJson = jsx:decode(list_to_binary(ResponseBody)),
  integer_to_list(proplists:get_value(<<"id">>, BodyJson)).

parse_response({ok, {{_, 200, _}, _, Body}}) -> Body;
parse_response({ok, {{_, _, _}, _, _}})-> error.

投稿、削除をする部分の関数と、レスポンスをパースしてTweetIDを取得するための関数です。(API経由でツイートを削除するにはTweetIDが必要なのです。)

実装はこちらの記事を参考にさせていただきました。

qiita.com

意外と簡単。

jamesbot_srv

handle_call(_Request, _From, State) ->
  {reply, ok, State}.

handle_cast(delete_tweet, State = #state{handler = HandlerPid, id_str = Id}) ->
  Res = parse_response(delete(Id)),
  case Res of
    error ->
      {ok, _} = timer:apply_after(1000, gen_server, cast, [self(), delete_tweet]),
      HandlerPid ! "failed to delete, attempting to re-delete...",
      {noreply, State};
    _ ->
      HandlerPid ! "deletion completed",
      {stop, normal, State}
  end;
handle_cast(_Info, State) ->
  {noreply, State}.

handle_info(send_tweet, State = #state{handler = HandlerPid, text = Text}) ->
  Res = parse_response(post(Text)),
  case Res of
    error ->
      self() ! send_tweet,
      HandlerPid ! "failed to send, attempting to resend...",
      {noreply, State};
    _ ->
      Id = find_id(Res),
      {ok, _} = timer:apply_after(?INTERVAL * 1000, gen_server, cast, [self(), delete_tweet]),
      HandlerPid ! "succeeded to send, the tweet will be deleted later.",
      {noreply, State#state{id_str = Id}}
  end;
handle_info(_Info, State) ->
  {noreply, State}.

200 OK以外のレスポンスが返って来た場合の再送処理も含めているので少しコードが冗長で読みにくくなっています。

ここには載せていませんが、jamesbot_srv:init/1の中でワーカプロセス自身にsend_tweetのメッセージを送信しています。先にhandle_info/2の中でツイートの投稿が成功するまで繰り返すようにし、成功した時点でtimer:apply_after/4を使ってツイートの削除をするための処理が含まれる関数を呼んでいます。(厳密には、削除の処理が書かれているのはその関数のコールバック関数の中です。apply_after/4はMFAを引数にとるのでgen_server:cast/2を呼んでhandle_cast/2の中で削除処理を行うようにしました。)

handle_cast/2の中でもhandle_info/2と同様に削除が成功するまで再送をするようにし、成功したらプロセスが正常終了するようになっています。

なんだかかなり納得のいかないコードになっていますが、投稿と削除の責務を別のプロセスとして分割するべきなのではと思ったのですがどうなんでしょう。誰か教えてください…。

summary

observer:start/0なんかを使ってプロセスを監視してみると、jamesbotのワーカプロセスが投稿するツイート毎に起動して削除後に消滅するのが確認できると思います。皆さんも並行処理を感じてみてください。

さて、19日の記事はYUARAさんです。よろしくお願いいたします!