Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Big refactoring #13

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 22 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,62 +9,39 @@ JSON on STDOUT.

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed as:

1. Add `std_json_io` to your list of dependencies in `mix.exs`:

def deps do
[{:std_json_io, "~> 0.1.0"}]
end

2. Ensure `std_json_io` is started before your application:

def application do
[applications: [:std_json_io]]
end

### Setup

Define a module and use StdJsonIo.

1. Add `std_json_io` to your list of dependencies in `mix.exs`:
```elixir
defmodule MyApp.ReactIo do
use StdJsonIo, otp_app: :my_app
def deps do
[{:std_json_io, "~> 0.1.0"}]
end
```

When you use `StdJsonIo` your module becomes a supervisor. You'll need to add it
to your supervision tree.

2. Ensure `std_json_io` is started before your application:
```elixir
children = [
# snip
supervisor(MyApp.ReactIo, [])
]

opts = [strategy: :one_for_one, name: MyApp]

Supervisor.start_link(children, opts)
def application do
[applications: [:std_json_io]]
end
```


### Configuration

You can either configure as additional arguments of the use statement, or in your config file.

```elixir
config :my_app, MyApp.ReactIo,
pool_size: 20, # default 5
max_overflow: 10, # default 10
script: "path/to/script", # for react-io use "react-stdio"
watch_files: [
Path.join([__DIR__, "../priv/server/js/component.js"]) # do not watch files in dev
]
config :std_json_io,
pool_size: 5,
pool_max_overflow: 10,
script: "node_modules/.bin/react-stdio"
```

* `pool_size` - see [Poolboy options](https://github.com/devinus/poolboy#options), option "size"
* `pool_max_overflow` - See [Poolboy options](https://github.com/devinus/poolboy#options), option "max_overflow"
* `script` - the script to run for the IO server
* `watch_files` - A list of files to watch for changes. When the file changes,
kill the IO worker and restart, picking up any changes. Use only in dev.
* `pool_size` - The size for the pool of workers - See poolboy `size`
* `max_overflow` - The poolboy `max_overflow`

### Usage example
```elixir
{:ok, data} = StdJsonIo.json_call(%{"component" => "my/component.js"}
# or
data = StdJsonIo.json_call!(%{"component" => "my/component.js"}
```

### Development
There are some tests taking long to run (testing timeouts, long replies, etc.) with tag `long: true` which are excluded by default. To run all the tests including long, you have to run `mix test --include long:true`
6 changes: 6 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ use Mix.Config
# here (which is why it is important to import them last).
#
# import_config "#{Mix.env}.exs"
config :std_json_io,
pool_size: 5,
pool_max_overflow: 10,
script: "python -u test/fixtures/echo.py 2>/dev/null"

config :porcelain, driver: Porcelain.Driver.Basic
91 changes: 13 additions & 78 deletions lib/std_json_io.ex
Original file line number Diff line number Diff line change
@@ -1,84 +1,19 @@
defmodule StdJsonIo do

defmacro __using__(opts) do
otp_app = Keyword.get(opts, :otp_app)

if !otp_app do
raise "StdJsonIo requires an otp_app"
def json_call!(map, timeout \\ 10000) do
case json_call(map, timeout) do
{:ok, data} -> data
{:error, reason } -> raise "Failed to call to json service, reason: #{to_string(reason)}"
end
end

quote do
use Supervisor
@pool_name Module.concat(__MODULE__, Pool)
@options Keyword.merge(unquote(opts), (Application.get_env(unquote(otp_app), __MODULE__) || []))


def start_link(opts \\ []) do
Supervisor.start_link(__MODULE__, :ok, name: {:local, __MODULE__})
end

def init(:ok) do
pool_options = [
name: {:local, @pool_name},
worker_module: StdJsonIo.Worker,
size: Keyword.get(@options, :pool_size, 5),
max_overflow: Keyword.get(@options, :max_overflow, 10)
]

script = Keyword.get(@options, :script)

children = [:poolboy.child_spec(@pool_name, pool_options, [script: script])]

files = Keyword.get(@options, :watch_files)

if files && length(files) > 0 do
Application.ensure_started(:fs, :permanent)

reloader_spec = worker(
StdJsonIo.Reloader,
[__MODULE__, Enum.map(files, &Path.expand/1)],
[]
)

children = [reloader_spec | children]
end

supervise(children, strategy: :one_for_one, name: __MODULE__)
end

def restart_io_workers! do
case Process.whereis(@pool_name) do
nil ->
Supervisor.restart_child(__MODULE__, @pool_name)
_pid ->
Supervisor.terminate_child(__MODULE__, @pool_name)
Supervisor.restart_child(__MODULE__, @pool_name)
end
end

def json_call!(map, timeout \\ 10000) do
case json_call(map, timeout) do
{:ok, data} -> data
{:error, reason } -> raise "Failed to call to json service #{__MODULE__} #{to_string(reason)}"
end
end

def json_call(map, timeout \\ 10000) do
result = :poolboy.transaction(@pool_name, fn worker ->
GenServer.call(worker, {:json, map}, timeout)
end)

case result do
{:ok, json} ->
{:ok, data} = Poison.decode(json)
if data["error"] do
{:error, Map.get(data, "error")}
else
{:ok, data}
end
other -> other
end
end
def json_call(data, timeout \\ 10000) do
result = :poolboy.transaction(StdJsonIo.Pool, fn worker ->
GenServer.call(worker, {:json, data, timeout}, :infinity)
end)
if result["error"] do
{:error, Map.get(result, "error")}
else
{:ok, result}
end
end
end
20 changes: 20 additions & 0 deletions lib/std_json_io/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule StdJsonIo.Application do
use Application

def start(_type, _args) do
import Supervisor.Spec, warn: false
config = Application.get_all_env(:std_json_io)
pool_options = [
name: {:local, StdJsonIo.Pool},
worker_module: StdJsonIo.Worker,
size: Keyword.get(config, :pool_size, 15),
max_overflow: Keyword.get(config, :pool_max_overflow, 10),
strategy: :fifo
]
children = [
:poolboy.child_spec(StdJsonIo.Pool, pool_options, [script: Keyword.fetch!(config, :script)])
]
opts = [strategy: :one_for_one, name: StdJsonIo.Supervisor]
Supervisor.start_link(children, opts)
end
end
23 changes: 0 additions & 23 deletions lib/std_json_io/reloader.ex

This file was deleted.

76 changes: 47 additions & 29 deletions lib/std_json_io/worker.ex
Original file line number Diff line number Diff line change
@@ -1,48 +1,66 @@
defmodule StdJsonIo.Worker do
use GenServer
alias Porcelain.Process, as: Proc
alias Porcelain.Process, as: PProc
alias Porcelain.Result

@initial_state %{
pproc: nil,
buffer: "",
from: nil,
timer: false,
stop_reason: nil
}

def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts[:script], opts)
end

def init(script) do
:erlang.process_flag(:trap_exit, true)
{:ok, %{js_proc: start_io_server(script)}}
Process.flag(:trap_exit, true)
pproc = Porcelain.spawn_shell(script, in: :receive, out: {:send, self()})
{:ok, %{@initial_state | pproc: pproc}}
end

def handle_call({:json, blob}, _from, state) do
case Poison.encode(blob) do
nil -> {:error, :json_error}
{:error, reason} -> {:error, reason}
{:ok, json} ->
Proc.send_input(state.js_proc, json)
receive do
{_js_pid, :data, :out, msg} ->
{:reply, {:ok, msg}, state}
response ->
{:reply, {:error, response}, state}
end
end
def handle_call({:json, data, timeout}, from, %{pproc: pproc} = state) do
{:ok, json} = Poison.encode(data)
PProc.send_input(pproc, json <> "\n")
timer = Process.send_after(self(), :timeout, timeout)
{:noreply, %{state | from: from, timer: timer}}
end

def handle_call(:stop, _from, state), do: {:stop, :normal, :ok, state}

def handle_info({pproc_pid, :data, :out, data}, %{pproc: %PProc{pid: pproc_pid}, buffer: buffer} = state) do
new_buffer = buffer <> data
case Poison.decode(new_buffer) do
{:ok, decoded} ->
Process.cancel_timer(state[:timer])
GenServer.reply(state[:from], decoded)
{:noreply, %{state | buffer: "", timer: false}}
_ ->
{:noreply, %{state | buffer: new_buffer}}
end
end
# The js server has stopped
def handle_info({_js_pid, :result, %Result{err: _, status: _status}}, state) do
{:stop, :normal, state}
def handle_info({pproc_pid, :result, %Result{err: _, status: _status}}, %{pproc: %PProc{pid: pproc_pid}} = state) do
{:stop, :normal, %{state | stop_reason: "Server have been terminated"}}
end

def terminate(_reason, %{js_proc: server}) do
Proc.signal(server, :kill)
Proc.stop(server)
:ok
# Complete response was not received within given timeout
# Stop the server with appropriate reason
def handle_info(:timeout, state) do
{:stop, :normal, %{state | stop_reason: "timeout"}}
end

def terminate(_reason, _state), do: :ok

defp start_io_server(script) do
Porcelain.spawn_shell(script, in: :receive, out: {:send, self()})
def terminate(_reason, %{pproc: pproc, timer: timer, from: from, buffer: buffer, stop_reason: stop_reason}) do
unless timer == false do
# Process is being terminated while client is awaiting response
error = %{
"message" => stop_reason,
"buffer" => buffer
}
GenServer.reply(from, %{"error" => error})
Process.cancel_timer(timer)
end
PProc.stop(pproc)
PProc.signal(pproc, :kill)
:ok
end
end
11 changes: 5 additions & 6 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ defmodule StdJsonIo.Mixfile do
maintainers: @maintainers,
description: "Application for managing and communicating with IO servers via JSON",
homepage_url: @url,
docs: docs,
deps: deps]
docs: docs(),
deps: deps()]
end

def application do
[
applications: [:logger, :porcelain],
included_applications: [:fs]
mod: {StdJsonIo.Application, []}
]
end

Expand All @@ -38,10 +38,9 @@ defmodule StdJsonIo.Mixfile do

defp deps do
[
{:porcelain, "~> 2.0"},
{:porcelain, "~> 2.0.3"},
{:poolboy, "~> 1.5.1"},
{:poison, "~> 1.5.0"},
{:fs, "~> 0.9.1"},
{:poison, "~> 2.0 or ~> 3.1.0"}
]
end

Expand Down
7 changes: 3 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
%{"fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], []},
"poison": {:hex, :poison, "1.5.0", "f2f4f460623a6f154683abae34352525e1d918380267cdbd949a07ba57503248", [:mix], []},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []},
"porcelain": {:hex, :porcelain, "2.0.1", "9c3db2b47d8cf6879c0d9ac79db8657333974a88faff09e856569e00c1b5e119", [:mix], []}}
%{"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"},
"porcelain": {:hex, :porcelain, "2.0.3", "2d77b17d1f21fed875b8c5ecba72a01533db2013bd2e5e62c6d286c029150fdc", [:mix], [], "hexpm"}}
17 changes: 17 additions & 0 deletions test/fixtures/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python
import sys
import time

for line in iter(sys.stdin.readline, ''):
line = line.rstrip('\n')
if line == "{\"test\":\"sleep3s\"}":
time.sleep(3)
sys.stdout.write('{"response": '+ line + '}'),
elif line == "{\"test\":\"error\"}":
sys.stdout.write('{"error": '+ line + '}'),
elif line == "{\"test\":\"crash\"}":
raise Exception('some exception')
elif line == "{\"test\":\"not_json\"}":
sys.stdout.write('plaintext'),
else:
sys.stdout.write('{"response": '+ line + '}'),
Loading