【译】在Elixir中创建riak_core应用(2)
Table of Contents
越翻越觉得汗,水平太臭,一边翻译一边就能发现自己的错误,真怕误人子弟,建议对照英文原文看。
1 在Elixir中创建riakcore应用(2)
首先,感谢所有读过我之前一篇文章的人, 在本篇文章中,我们将继续使用前文的代码,并增加一些新的功能,你可以在这里找到最终版本。
我们将添加的第一个功能是经典的Ping。
开始之前,快速回顾一下,我们已经有了一个简单的“空”应用,它可以编译并用下面的命令在三个不同的环境下运行:
# this is node 1 MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run # this is node 2 MIX_ENV=gpad_2 iex --name gpad_2@127.0.0.1 -S mix run # this is node 3 MIX_ENV=gpad_3 iex --name gpad_3@127.0.0.1 -S mix run
我们要做的是增加一个简单的可以在控制台中使用的=ping=,用来展示集群如何工作。
1.1 节点,虚拟节点和组织他们的环
riakcore的主要概念之一是与环相关的虚拟节点的概念
环的想法很简单,你应该取得你在程序中管理的所有可能的值(这被成为键空间)并进行分区。 例如riakcore使用经典的hash函数把二进制串计算出一个值,代码类似这样:
iex(1)> key = "gpad" "gpad" iex(2)> idx = :erlang.term_to_binary(key) |> :crypto.sha <<211, 108, 199, 240, 242, 57, 27, 91, 139, 82, 154, 145, 27, 215, 191, 24, 107, 77, 162, 202>> iex(3)> << v::unsigned-big-integer-size(160) >> = idx <<211, 108, 199, 240, 242, 57, 27, 91, 139, 82, 154, 145, 27, 215, 191, 24, 107, 77, 162, 202>> iex(4)> v 1207022950459909619005619617169028319269647917770 iex(5)>
如你所见,我们将一个单词作为作为键并转为二进制串,然后使用=:crypt.sha=对它进行hash运算,最后,我们使用了 二进制模式匹配 打印了它的值,值的范围在0到2160之间
riakcore默认把键空间(2160)分割为64个分区(partition),每个分区负责管理(/处理/)环上的一段(segment)。 这样的单个分区被成为虚拟节点(/vnode/),虚拟节点分布在真实的节点上,这样,当我们添加或删除节点时,可以做到尽可能少的数据传输。 (更多的细节将在下一篇文章中介绍)
我们身处BEAM的世界中,所以很自然就会想到将每个/vnode/映射到一个Erlang的进程。Riakcore非常不错的提供了一个behaviour来帮助我们实现vnode进程。在Elixir中使用行为(behaviour) 也非常简单。我建议在=lib/noslides=中创建一个名为=NoSlides.VNode=的模块,然后像下面这样声明=riakcorevnode=行为:
defmodule NoSlides.VNode do @behaviour :riak_core_vnode end
如果这是你尝试编译一下工程,你会得到这样一些告警信息*warning: undefined behaviour function …*。这是因为我们还没有实现这些行为。 如果你熟悉面向对象编程,你可以把行为想像成接口,模块应该实现一系列的函数,并在一个确定的上下文中执行,下面是我们看到的告警:
warning: undefined behaviour function delete/1 warning: undefined behaviour function encode_handoff_item/2 warning: undefined behaviour function handle_command/3 warning: undefined behaviour function handle_coverage/4 warning: undefined behaviour function handle_exit/3 warning: undefined behaviour function handle_handoff_command/3 warning: undefined behaviour function handle_handoff_data/2 warning: undefined behaviour function handoff_cancelled/1 warning: undefined behaviour function handoff_finished/2 warning: undefined behaviour function handoff_starting/2 warning: undefined behaviour function init/1 warning: undefined behaviour function is_empty/1 warning: undefined behaviour function terminate/2
应该在模块中实现下面这些函数,这些函数会在触发特定事件时被调用:
- init/1 – 当虚拟节点创建时被调用,它和GenServer 的/init/非常相似。它接收一个数组作为参数,数组里是vnode负责管理的分区的值。该函数返回进程的状态(state)。在其他的回调函数中, 进程状态总是作为最后一个传入的参数。
- terminate/2 – 当虚拟节点终止时被调用,它接收reason(原子类型)和state作为参数。
- handleexit/3 – 当vnode关联的进程死亡时, 该函数被调用。它接收死亡进程的pid,reason和state。你可以返回={:noreply, state}=来继续(运行程序)。
- delete/1 – 当需要删除与vnode关联的数据时调用,和其他函数一样,它接收state作为参数并返回={:ok, newstate}=。
- handlecommand/3 – 当vnode执行命令(command)时被调用,这个函数被用来实现我们自己的命令。
- handlecoverage/4 – 当我们创建coverage命令时调用。我们将在下一篇文章中处理这种类型的命令。
- 剩下的函数都和handoff过程(handoffs procedure)有关,我们将在下一篇中讨论
当实现最后一个函数=startvnode/1=后,系统就可以正常工作了,最终,你的模块看起来是这样:
defmodule NoSlides.VNode do @behaviour :riak_core_vnode def start_vnode(partition) do :riak_core_vnode_master.get_vnode_pid(partition, __MODULE__) end def init([partition]) do {:ok, partition} end def handle_command(request, sender, state) do # we work here!!! end def handoff_starting(_dest, state) do {true, state} end def handoff_cancelled(state) do {:ok, state} end def handoff_finished(_dest, state) do {:ok, state} end def handle_handoff_command(_fold_req, _sender, state) do {:noreply, state} end def is_empty(state) do {true, state} end def terminate(_reason, _state) do :ok end def delete(state) do {:ok, state} end def handle_handoff_data(_bin_data, state) do {:reply, :ok, state} end def encode_handoff_item(_k, _v) do end def handle_coverage(_req, _key_spaces, _sender, state) do {:stop, :not_implemented, state} end def handle_exit(_pid, _reason, state) do {:noreply, state} end end
真正的工作在函数=handlecommand/3=里实现,使用模式匹配(和GenServer类似),我们像这样来实现handlecommand
def handle_command({:ping, v}, _sender, state) do {:reply, {:pong, v + 1}, state} end
现在,我们已经实现了ping命令。那么如何从控制台执行?我们需要引入一个我喜欢称之为*服务*的概念。 服务是一个模块,它包装了一些操作,可以与riakcore对外开放的命令交互。服务应该在riakcore中注册, 这样riakcore才会知道是什么节点暴露了服务。代码如下:
defmodule NoSlides.Service do def ping(v\\1) do idx = :riak_core_util.chash_key({"noslides", "ping#{v}"}) pref_list = :riak_core_apl.get_primary_apl(idx, 1, NoSlides.Service) [{index_node, _type}] = pref_list :riak_core_vnode_master.sync_command(index_node, {:ping, v}, NoSlides.VNode_master) end end
在第4行,我们算出了要存储的值的id。这个id是/键空间/上的一个值。利用这个*idx*,我们可以用*getprimaryapl*(第5行)向riakcore请求一个*首选项列表*(preference list)。首选项列表是一个集合,它包含了哪些节点处理哪些分区的信息。当调用/getprimaryapl/时, 我们需要列表的长度1(第二个参数)和一个实现了*NoSlides.Service*的节点(第三个参数)。在这个例子中,我们只请求了一个元素, 因为我们只希望在一个节点上执行命令,下一篇文章中,我们会讨论/冗余/(redundancy)。从首选项列表中,我们获取了*indexnode*, 它用于标识执行命令的真实的或是虚拟的节点。这个节点对具有*idx*标识的数据拥有所有权。
在第9行,我们用*indexnode*作为参数调用函数=:riakcorevnodemaster.synccommand=。该函数是同步的, 也就是说直到vnode模块完成工作后它才会返回。如果查看=:riakcorevnodemaster=的代码,你会发现=:riakcorevnodemaster.command=, 这个函数采用的则是异步的方式。
你可能还发现了=syncspawncommand=,它类似于=synccommand=,查看源码的话,你会发现这样的注释
%% Send a synchronous spawned command to an individual Index/Node %% combination. %% Will not return until the vnode has returned, but the %%% vnode_master will %% continue to handle requests. sync_spawn_command({Index,Node}, Msg, VMaster) ->
事实上并不是这样的,这些可能是老的注释或者是老的实现,最后要说的是,riakcore约定, vnode的名称带有=master=后缀(*NoSlides.VNodemaster*)
现在,我们已经实现了*Service*和*VNode*,还需要把所有的这些集中在一起,为此,我们要从头开始…
1.2 启动riakcore程序
在OTP应用中,我们需要从一个实现了Application行为的模块开始。 如果你用mix创建了一个空的工程,那么你可能已经有了一个引入了=use Application=的名为=NoSlides=的模块,扔掉它并替换成这样:
defmodule NoSlides do use Application require Logger def start(_type, _args) do case NoSlides.Supervisor.start_link do {:ok, pid} -> :ok = :riak_core.register(vnode_module: NoSlides.VNode) :ok = :riak_core_node_watcher.service_up(NoSlides.Service, self()) {:ok, pid} {:error, reason} -> Logger.error("Unable to start NoSlides supervisor because: #{inspect reason}") end end end
第6行,我们启动了一个supervisor,稍后再来实现它。如果一切顺利,我们在第8行注册实现了vnode的模块, 在第9行注册实现了服务的模块。
supervisor的实现应该放在=lib/noslides/supervisor.ex=中,内容像是这样:
defmodule NoSlides.Supervisor do use Supervisor def start_link do # riak_core appends _sup to the application name. Supervisor.start_link(__MODULE__, [], [name: :no_slides_sup]) end def init(_args) do children = [ worker(:riak_core_vnode_master, [NoSlides.VNode], id: NoSlides.VNode_master_worker) ] supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10) end end
这是一个经典的supervisor,但要注意一些细节,supervisor的id应当以=sup=为结束(第6行), 而工作者(worker)的id应当使用=masterworker=后缀(第11行)
之后,我们可以使用下面的命令启动节点:
MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run
在IEx中,可以运行ping服务:
Interactive Elixir (1.3.4) - press Ctrl+C to exit (type h() ENTER for help) iex(gpad_1@127.0.0.1)1> NoSlides.Service.ping {:pong, 2} iex(gpad_1@127.0.0.1)2> NoSlides.Service.ping(42) {:pong, 43} iex(gpad_1@127.0.0.1)3>
现在,我们可以加入更多的节点来运行/分布式的ping/,第一步,我们要在不同的控制台启动更多的节点:
# this is node 1 MIX_ENV=gpad_1 iex --name gpad_1@127.0.0.1 -S mix run # this is node 2 MIX_ENV=gpad_2 iex --name gpad_2@127.0.0.1 -S mix run # this is node 3 MIX_ENV=gpad_3 iex --name gpad_3@127.0.0.1 -S mix run
现在可以将所有的节点连接起来了,在第二个节点的控制台键入:
iex(gpad_2@127.0.0.1)1> :riak_core.join('gpad_1@127.0.0.1')
同样,在节点3执行相同的操作
iex(gpad_3@127.0.0.1)1> :riak_core.join('gpad_1@127.0.0.1')
如果查看节点1的控制台,那么会看到这样的日志:
12:21:53.168 [info] 'gpad_2@127.0.0.1' joined cluster with status 'valid' 12:22:39.155 [info] 'gpad_3@127.0.0.1' joined cluster with status 'joining' 12:22:39.191 [info] 'gpad_3@127.0.0.1' changed from 'joining' to 'valid'
现在,你可以使用下面的命令来请求riakcore打印*环*的状态:
{:ok, ring} = :riak_core_ring_manager.get_my_ring :riak_core_ring.pretty_print(ring, [:legend])
你会看到这样的输出:
============================= Nodes ============================= Node a: 22 ( 34.4%) gpad_1@127.0.0.1 Node b: 21 ( 32.8%) gpad_2@127.0.0.1 Node c: 21 ( 32.8%) gpad_3@127.0.0.1 ============================= Ring ============================= abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abcc|abca|abba|abba|abba|abba|abba|
正如你所看到的,环被分成了64个分区,节点1拥有22个VNode,剩下两个节点各拥有21个,你同样可以在*observer*中看到:
现在我们可以在VNode的实现中加入日志,这样就可以看到是哪个节点响应了ping(记得在模块顶部加入=require Logger=)
def handle_command({:ping, v}, _sender, state) do Logger.debug("Receive ping with value: #{v}") {:reply, {:pong, v + 1}, state} end
在控制台,执行命令:
iex(gpad_1@127.0.0.1)1> NoSlides.Service.ping {:pong, 2}
节点2上,就可以看到如下日志:
12:43:00.822 [debug] Receive ping with value: 1
我们已经有了一个非常简单的分布式的ping,如果改变传递给ping的值,能够看到响应ping的不同的节点。 例如,如果使用42,则节点3会作出响应。
现在,我们有了ping,这样我们就可以轻松的创建一个基于内存的键值存储。
1.3 内存中的KV
现在我们已经了解如何连接服务和vnode,我们可以在服务模块上暴露=get=和=put=函数,来轻松创建一个内存键值存储,
defmodule NoSlides.Service do # ... def put(k, v) do idx = :riak_core_util.chash_key({"noslides", k}) pref_list = :riak_core_apl.get_primary_apl(idx, 1, NoSlides.Service) [{index_node, _type}] = pref_list :riak_core_vnode_master.command(index_node, {:put, {k, v}}, NoSlides.VNode_master) end def get(k) do idx = :riak_core_util.chash_key({"noslides", k}) pref_list = :riak_core_apl.get_primary_apl(idx, 1, NoSlides.Service) [{index_node, _type}] = pref_list :riak_core_vnode_master.sync_command(index_node, {:get, k}, NoSlides.VNode_master) end end
同样,在VNode中也要加上=get=和=put=的实现:
defmodule NoSlides.VNode do def init([partition]) do {:ok, %{partition: partition, data: %{}}} end # ... def handle_command({:put, {k, v}}, sender, state) do Logger.debug("[put]: k: #{inspect k} v: #{inspect v}") new_state = Map.update(state, :data, %{}, fn data -> Map.put(data, k, v) end) {:noreply, new_state} end def handle_command({:get, k}, sender, state) do Logger.debug("[get]: k: #{inspect k}") {:reply, Map.get(state.data, k, nil), state} end end
现在,你可以从不同的节点上进行获取和添加的操作了。
重启所有节点,但不需要重新执行加入节点的操作,在节点1的控制台,执行下面的命令:
iex(gpad_1@127.0.0.1)1> NoSlides.Service.put(:k, 42) :ok iex(gpad_1@127.0.0.1)2> NoSlides.Service.get(:k) 42
检查节点2上的日志:
iex(gpad_2@127.0.0.1)1> 19:31:30.634 [debug] [put]: k: :k v: 42 19:31:39.242 [debug] [get]: k: :k
同样,在节点3上也可以取值
iex(gpad_3@127.0.0.1)1> NoSlides.Service.get(:k) 42
我们拥有了一个内存中的键值存储,你可以添加很多不同类型数据作为值:
iex(gpad_1@127.0.0.1)9> NoSlides.Service.put("gpad", %{ blogs: ["riak_core I", "riak_core II"] }) :ok iex(gpad_1@127.0.0.1)10> NoSlides.Service.get("gpad") %{blogs: ["riak_core I", "riak_core II"]}
同样也可以是键:
iex(gpad_1@127.0.0.1)11> NoSlides.Service.put(%{a: 1, b: 2}, "gpad") :ok iex(gpad_1@127.0.0.1)12> NoSlides.Service.get(%{a: 1, b: 2}) "gpad"
以一个简单的内存键值存储的作为开始,还有一些问题:
- 如果一个节点使用=:riakcore.leave=离开集群,会发生什么?
- 如果某个节点崩溃,会发生什么?
- 我们如何获取所有的键?
我会在下一篇中尝试回答这些问题,如果你有任何问题或发现了什么错误,请不要犹豫,在下面留言吧。
作为译者,我的英文水平也很挫,如果读者发现任何问题,也请留言吧