Skip to content

Latest commit

 

History

History

stream

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 

Streaming Trigger

This module builds a MarketStore trigger which pushes data through MarketStore's streaming interface. The push is triggered by writes to the on-disk data, both in the base timeframe as well as the aggregates. Aggregated data entries will be placed on a 'shelf' and will be given a shelf-life that indicates when they will be delivered. These entries will be updated with the latest aggregated candle up until the expiration time.

Note that all data is transmitted with MessagePack encoding.

Configuration

stream.so comes with the server by default, so you can simply configure it in MarketStore configuration file.

Options

Name Type Default Description
on string none The file glob pattern to match on
filter string none Filters pushes to '1D' timeframes and above based on market hours. Only 'nasdaq' is supported at this time.

Example

Add the following to your config file:

triggers:
  - module: stream.so
    on: */*/*
    config:
        filter: "nasdaq"

Protocol

Streams are organized by time bucket keys, just like the on-disk data is inside MarketStore. When subscribing to a stream, or many streams, the stream name should take the form of <Symbol>/<Timeframe>/<AttributeGroup>, where any, none or all of the 3 parts of the composite key can be replaced with * to subscribe to all of them. For example, to subscribe to all 5Min bars for BTC-USD, one would simply subscribe to the stream: BTC-USD/5Min/OHLCV. If one wanted to subscribe to all timeframes of BTC-USD, then the stream name would be: BTC-USD/*/OHLCV.

Note that to modify your subscription, another subscribe message must be sent over the websocket connection. The set of streams in the new subscribe message will replace any previously subscribed streams.

Subscribing with the included GoLang MarketStore client is as simple as:

handler := func(pl stream.Payload) error { fmt.Println(string(pl.Data)) }
cancelC := make(chan struct{})
streams := []string{"BTC-USD/*/*", "ETH-USD/1Min/OHLCV"}

done, err := client.Subscribe(handler, cancelC, streams)

if err != nil {
    panic(err)
}

<-done

All the messages are encoded in MessagePack. The message flow at low level looks as follows.

<Connection Made on "/ws">
Client: {"streams": ["BTC-USD/*/*", "ETH-USD/1Min/OHLCV"]}
Server: {"streams": ["BTC-USD/*/*", "ETH-USD/1Min/OHLCV"]}
Server: {"key": "ETH-USD/1Min/OHLCV", "data": {'Low': 1088.54, 'Close': 1088.54, 'Volume': 23.002666809999997, 'Epoch': 1516368000, 'Open': 1088.54, 'High': 1088.55}}
Server: {'key': 'BTC/1Min/OHLCV', 'data': {'Epoch': 1516386000, 'Open': 11301.01, 'High': 11301.01, 'Low': 11300.0, 'Close': 11301.01, 'Volume': 28.9793876}}
...

If an error occurs during the "streams" request (i.e. the streams format is not valid), it will return error as below.

Server: {"error": "error message for details"}

Build

If you need to change the code, you can build it from this directory by:

$ make all

It installs the new .so file to the first $GOPATH/bin directory.

Caveat

Since this is implemented based on the Go's plugin mechanism, it is supported only on Linux & MacOS as of Go 1.10