# Data Streams SDK (Go)
Source: https://docs.chain.link/data-streams/reference/data-streams-api/go-sdk


<DataStreams section="dsNotes" />

This documentation provides a detailed reference for the Data Streams SDK for Go. It implements a client library that offers a domain-oriented abstraction for interacting with the Data Streams API and enables both point-in-time data retrieval and real-time data streaming with built-in fault tolerance capabilities.

## Requirements

- Go 1.22.4 or later
- Valid Chainlink Data Streams credentials

## Installation

```go
go get github.com/smartcontractkit/data-streams-sdk/go
```

## streams

### Import

```go
import streams "github.com/smartcontractkit/data-streams-sdk/go"
```

### Types

#### `Client` interface

[Interface](https://github.com/smartcontractkit/data-streams-sdk/blob/main/go/client.go#L21) for interacting with the Data Streams API. Use the [`New`](/data-streams/reference/data-streams-api/go-sdk#new) function to create a new instance of the client.

###### Interface Methods

- `GetFeeds`: Lists all streams available to you.

  ```go
  GetFeeds(ctx context.Context) (r []*feed.Feed, err error)
  ```

- `GetLatestReport`: Fetches the latest report available for the specified `FeedID`.

  ```go
  GetLatestReport(ctx context.Context, id feed.ID) (r *ReportResponse, err error)
  ```

- `GetReports`: Fetches reports for the specified stream IDs and a given timestamp.

  ```go
  GetReports(ctx context.Context, ids []feed.ID, timestamp uint64) ([]*ReportResponse, error)
  ```

- `GetReportPage`: Paginates the reports for a specified `FeedID` starting from a given timestamp.

  ```go
  GetReportPage(ctx context.Context, id feed.ID, startTS uint64) (*ReportPage, error)
  ```

- `Stream`: Creates a real-time report stream for specified stream IDs.

  ```go
  Stream(ctx context.Context, feedIDs []feed.ID) (Stream, error)
  ```

- `StreamWithStatusCallback`: Creates a real-time report stream for specified stream IDs with a callback function for connection status updates.

  ```go
  StreamWithStatusCallback(ctx context.Context, feedIDs []feed.ID, connStatusCallback func(isConnected bool, host string, origin string)) (Stream, error)
  ```

#### `Config` struct

Configuration struct for the client. `Config` specifies the client configuration and dependencies. If you specify the `Logger` function, informational client activity is logged.

```go
type Config struct {
	ApiKey             string                        // Client API key
	ApiSecret          string                        // Client API secret
	RestURL            string                        // Rest API URL
	WsURL              string                        // Websocket API URL
	WsHA               bool                          // Use concurrent connections to multiple Streams servers
	WsMaxReconnect     int                           // Maximum number of reconnection attempts for Stream underlying connections
	LogDebug           bool                          // Log debug information
	InsecureSkipVerify bool                          // Skip server certificate chain and host name verification
	Logger             func(format string, a ...any) // Logger function

	// InspectHttpResponse intercepts http responses for rest requests.
	// The response object must not be modified.
	InspectHttpResponse func(*http.Response)
}
```

**Note**: If `WsMaxReconnect` is not specified, it defaults to 5 attempts.

#### `CtxKey` string

Custom context key type used for passing additional headers.

```go
type CtxKey string
```

- Constants:

  - `CustomHeadersCtxKey`: Key for passing custom HTTP headers.

  ```go
  const (
    // CustomHeadersCtxKey is used as key in the context.Context object
    // to pass in a custom http headers in a http.Header to be used by the client.
    // Custom header values will overwrite client headers if they have the same key.
    CustomHeadersCtxKey CtxKey = "CustomHeaders"
  )
  ```

#### `ReportPage` struct

Represents a paginated response of reports.

```go
type ReportPage struct {
	Reports    []*ReportResponse // Slice of ReportResponse, representing individual report entries.
	NextPageTS uint64            // Timestamp for the next page, used for fetching subsequent pages.
}
```

#### `ReportResponse` struct

Implements the report envelope that contains the full report payload, its stream ID and timestamps. Use the [`Decode`](/data-streams/reference/data-streams-api/go-sdk#decode) function to decode the report payload.

```go
type ReportResponse struct {
    FeedID                feed.ID `json:"feedID"`
    FullReport            []byte  `json:"fullReport"`
    ValidFromTimestamp    uint64  `json:"validFromTimestamp"`
    ObservationsTimestamp uint64  `json:"observationsTimestamp"`
}
```

##### Methods

- `MarshalJSON`: Serializes the `ReportResponse` into JSON.

  ```go
  func (r *ReportResponse) MarshalJSON() ([]byte, error)
  ```

- `String`: Returns the string representation of the `ReportResponse`.

  ```go
  func (r *ReportResponse) String() (s string)
  ```

- `UnmarshalJSON`: Deserializes the `ReportResponse` from JSON.
  ```go
  func (r *ReportResponse) UnmarshalJSON(b []byte) (err error)
  ```

#### `Stats` struct

Statistics related to the Stream's operational metrics.

```go
type Stats struct {
	Accepted              uint64 // Total number of accepted reports
	Deduplicated          uint64 // Total number of deduplicated reports when in HA
	TotalReceived         uint64 // Total number of received reports
	PartialReconnects     uint64 // Total number of partial reconnects when in HA
	FullReconnects        uint64 // Total number of full reconnects
	ConfiguredConnections uint64 // Number of configured connections if in HA
	ActiveConnections     uint64 // Current number of active connections
}
```

##### Methods

- `String`: Returns a string representation of the `Stats`.

  ```go
  func (s Stats) String() (st string)
  ```

#### `Stream` interface

[Interface](https://github.com/smartcontractkit/data-streams-sdk/blob/main/go/stream.go#L39) for managing a real-time data stream.

##### Interface Methods

- `Read`: Reads the next available report from the stream. This method will pause (block) the execution until one of the following occurs:

  - A report is successfully received.
  - The provided context is canceled, typically due to a timeout or a cancel signal.
  - An error state is encountered in any of the underlying connections, such as a network failure.

  ```go
  Read(context.Context) (*ReportResponse, error)
  ```

- `Stats`: Returns statistics about the stream operations as `Stats`.

  ```go
  Stats() Stats
  ```

- `Close`: Closes the stream.

  ```go
  Close() error
  ```

### High Availability (HA) Mode

The Data Streams SDK supports High Availability mode for WebSocket connections. When enabled with `WsHA: true`, the SDK maintains concurrent connections to different instances to ensure high availability, fault tolerance and minimize the risk of report gaps.

#### HA Mode Behavior

When high availability mode is enabled:

- The client queries the server for available origins using the `X-Cll-Available-Origins` header
- Multiple WebSocket connections are established to different server instances
- Reports are deduplicated when received across connections
- Partial reconnects occur when some connections fail while others remain active
- Full reconnects occur when all connections fail

#### HA Configuration

HA mode is controlled by the `WsHA` field in the client configuration:

```go
config := streams.Config{
    WsHA: true, // Use concurrent connections to multiple Streams servers
}
```

### Functions

#### `New`

Creates a new client instance with the specified configuration.

```go
func New(cfg Config) (c Client, err error)
```

**Note**: `New` does not initialize any connections to the Data Streams service.

#### `LogPrintf`

Utility function for logging.

```go
func LogPrintf(format string, a ...any)
```

### Errors

#### `ErrStreamClosed`

Error returned when attempting to use a closed stream.

```go
var ErrStreamClosed = fmt.Errorf("client: use of closed Stream")
```

## feed

### Import

```go
import feed "github.com/smartcontractkit/data-streams-sdk/go/feed"
```

### Types

#### `Feed` struct

Identifies the report stream ID.

```go
type Feed struct {
    FeedID ID `json:"feedID"`
}
```

Where `ID` is the unique identifier for the stream.

#### `FeedVersion` uint16

Represents the stream [report schema](/data-streams/reference/report-schema-overview) version.

```go
type FeedVersion uint16
```

##### Constants

- `FeedVersion1`: Version 1 schema
- `FeedVersion2`: Version 2 schema
- `FeedVersion3`: Version 3 schema
- `FeedVersion4`: Version 4 schema
- `FeedVersion5`: Version 5 schema
- `FeedVersion6`: Version 6 schema
- `FeedVersion7`: Version 7 schema
- `FeedVersion8`: Version 8 schema
- `FeedVersion9`: Version 9 schema
- `FeedVersion10`: Version 10 schema

#### `ID` \[32]byte

Represents a unique identifier for a stream.

```go
type ID [32]byte
```

##### Methods

- `FromString`: Converts a string into an `ID`.

  ```go
  func (f *ID) FromString(s string) (err error)
  ```

- `MarshalJSON`: Serializes the `ID` into JSON.

  ```go
  func (f *ID) MarshalJSON() (b []byte, err error)
  ```

- `String`: Returns the string representation of the `ID`.

  ```go
  func (f *ID) String() (id string)
  ```

- `UnmarshalJSON`: Deserializes the `ID` from JSON.

  ```go
  func (f *ID) UnmarshalJSON(b []byte) (err error)
  ```

- `Version`: Returns the version of the stream.

  ```go
  func (f *ID) Version() FeedVersion
  ```

## report

### Import

```go
import report "github.com/smartcontractkit/data-streams-sdk/go/report"
```

### Types

#### `Data` interface

[Interface](https://github.com/smartcontractkit/data-streams-sdk/blob/main/go/report/report.go#L20) that represents the actual report data and attributes.

```go
type Data interface {
    v1.Data | v2.Data | v3.Data | v4.Data | v5.Data | v6.Data | v7.Data | v8.Data | v9.Data | v10.Data
    Schema() abi.Arguments
}
```

#### `Report` struct

Represents the report envelope that contains the full report payload, its Feed ID, and timestamps.

```go
type Report[T Data] struct {
    Data          T
    ReportContext [3][32]byte
    ReportBlob    []byte
    RawRs         [][32]byte
    RawSs         [][32]byte
    RawVs         [32]byte
}
```

### Functions

#### `Decode`

Decodes the report serialized bytes and its data.

```go
func Decode[T Data](fullReport []byte) (r *Report[T], err error)
```

Example:

```go
payload, _ := hex.DecodeString(
    `0006bd87830d5f336e205cf5c63329a1dab8f5d56812eaeb7c69300e66ab8e22000000000000000000000000000000000000000000000000000000000cf7ed13000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000e0000000000000000000000000000000000000000000000000000000000000022000000000000000000000000000000000000000000000000000000000000003000101000101000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000012000030ab7d02fbba9c6304f98824524407b1f494741174320cfd17a2c22eec1de0000000000000000000000000000000000000000000000000000000066a8f5c60000000000000000000000000000000000000000000000000000000066a8f5c6000000000000000000000000000000000000000000000000000057810653dd9000000000000000000000000000000000000000000000000000541315da76d6100000000000000000000000000000000000000000000000000000000066aa474600000000000000000000000000000000000000000000000009a697ee4230350400000000000000000000000000000000000000000000000009a6506d1426d00000000000000000000000000000000000000000000000000009a77d03ae355fe0000000000000000000000000000000000000000000000000000000000000000672bac991f5233df89f581dc02a89dd8d48419e3558b247d3e65f4069fa45c36658a5a4820dc94fc47a88a21d83474c29ee38382c46b6f9a575b9ce8be4e689c03c76fac19fbec4a29dba704c72cc003a6be1f96af115e322321f0688e24720a5d9bd7136a1d96842ec89133058b888b2e6572b5d4114de2426195e038f1c9a5ce50016b6f5a5de07e08529b845e1c622dcbefa0cfa2ffd128e9932ecee8efd869bc56d09a50ceb360a8d366cfa8eefe3f64279c88bdbc887560efa9944238eb000000000000000000000000000000000000000000000000000000000000000060e2a800f169f26164533c7faff6c9073cd6db240d89444d3487113232f9c31422a0993bb47d56807d0dc26728e4c8424bb9db77511001904353f1022168723010c46627c890be6e701e766679600696866c888ec80e7dbd428f5162a24f2d8262f846bdb06d9e46d295dd8e896fb232be80534b0041660fe4450a7ede9bc3b230722381773a4ae81241568867a759f53c2bdd05d32b209e78845fc58203949e50a608942b270c456001e578227ad00861cf5f47b27b09137a0c4b7f8b4746cef`)

report, err := report.Decode[v3.Data](payload)
if err != nil {
    streams.LogPrintf("error decoding report: %s", err)
    os.Exit(1)
}

streams.LogPrintf(
    "FeedID: %s, FeedVersion: %d, Bid: %s, Ask: %s, BenchMark: %s, LinkFee: %s, NativeFee: %s, ValidFromTS: %d, ExpiresAt: %d",
    report.Data.FeedID.String(),
    report.Data.FeedID.Version(),
    report.Data.Bid.String(),
    report.Data.Ask.String(),
    report.Data.BenchmarkPrice.String(),
    report.Data.LinkFee.String(),
    report.Data.NativeFee.String(),
    report.Data.ValidFromTimestamp,
    report.Data.ExpiresAt,
)
```

## Report Format and Schema Versions

### Schema Versions

The SDK supports multiple report schema versions (v1-v10), each optimized for different use cases:

- **v2**: Feed IDs starting with `0x0002`
- **v3**: Feed IDs starting with `0x0003` (Crypto Streams)
- **v4**: Feed IDs starting with `0x0004` (Real-World Assets)
- **v5**: Feed IDs starting with `0x0005`
- **v6**: Feed IDs starting with `0x0006` (Multiple Price Values)
- **v7**: Feed IDs starting with `0x0007` (Exchange Rate)
- **v8**: Feed IDs starting with `0x0008` (Non-OTC RWA)
- **v9**: Feed IDs starting with `0x0009` (NAV Fund Data)
- **v10**: Feed IDs starting with `0x000a` (Tokenized Equity)

### Common Fields

All report versions include standard metadata:

- `FeedID`: Unique identifier for the data stream
- `ValidFromTimestamp`: When the report becomes valid
- `ExpiresAt`: When the report expires
- `LinkFee`: Fee in LINK tokens
- `NativeFee`: Fee in native blockchain currency
- `ObservationsTimestamp`: When the data was observed

### Usage Pattern

Each version follows the same decoding pattern:

```go
import v3 "github.com/smartcontractkit/data-streams-sdk/go/report/v3"

report, err := report.Decode[v3.Data](fullReport)
```

Use the appropriate version import based on your feed's schema version, which can be determined by calling `feedID.Version()`.

For complete field definitions and decoding examples, see the [report schema overview](/data-streams/reference/report-schema-overview) and [fetch tutorial](/data-streams/tutorials/go-sdk-fetch).