generated from parrogo/gomod
-
Notifications
You must be signed in to change notification settings - Fork 0
/
horzmerge.go
220 lines (188 loc) · 4.38 KB
/
horzmerge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Package horzmerge merges columns from one or more streams of data.
package horzmerge
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"strings"
"unicode"
)
// Options struct groups all options
// accepted by Merge.
//
// Target field contains the io.Writer
// on which to write merged columns.
// When it's nil, os.Stdout is used as writer.
//
// Empty field is a string that can be used
// to specify a value that will be interpreted as
// empty. When a cell of text contains this
// value (space trimmed), it cannot overwrite a
// previous value already set for the column,
// readed from one of the previous readers.
type Options struct {
Target io.Writer
Empty string
}
// Merge read lines from all io.Reader in sources,
// and build an hash of columns for every reader,
// interpreting data with a tabular semantic.
//
// All hashes created in this way are then merged
// into a single hash and saved in a tabular format
// again.
func Merge(opt Options, readers ...io.Reader) error {
if len(readers) == 0 {
return errors.New("no source readers provided")
}
var out *bufio.Writer
if opt.Target != nil {
out = bufio.NewWriter(opt.Target)
} else {
out = bufio.NewWriter(os.Stdout)
}
sources := make([]map[string]string, len(readers))
headerOrder := map[string]int{}
for idx, r := range readers {
source := bufio.NewReader(r)
headers, err := readHeaders(source)
for _, h := range headers {
if _, exists := headerOrder[h]; !exists {
headerOrder[h] = len(headerOrder)
}
}
if err != nil {
return fmt.Errorf("error reading from source %d: %w", idx, InputError{err, idx})
}
values, err := readValues(source)
if err != nil {
return fmt.Errorf("error reading from source %d: %w", idx, InputError{err, idx})
}
hash := map[string]string{}
for idx, head := range headers {
val := values[idx]
hash[head] = val
}
sources[idx] = hash
}
merged := map[string]string{}
for _, hash := range sources {
for key, val := range hash {
if mv, exists := merged[key]; !exists || strings.TrimSpace(mv) == opt.Empty {
merged[key] = val
}
}
}
var werr error
write := func(s string) {
if werr != nil {
return
}
_, e := out.WriteString(s)
if e != nil {
werr = fmt.Errorf("error writing output: %w", e)
}
}
headers := make([]string, len(merged))
values := make([]string, len(merged))
for k, v := range merged {
idx := headerOrder[k]
headers[idx] = k
values[idx] = v
idx++
}
for _, h := range headers {
write(h)
}
write("\n")
for _, v := range values {
write(v)
}
write("\n")
if werr == nil {
e := out.Flush()
if e != nil {
werr = fmt.Errorf("error flushing output: %w", e)
}
}
return werr
}
func checkHeaders(source *bufio.Reader, headers []string) error {
sourceHeaders, err := readHeaders(source)
if err != nil {
return err
}
if len(sourceHeaders) != len(headers) {
return fmt.Errorf("headers len differs: expected %d, got %d", len(headers), len(sourceHeaders))
}
for idx, head := range headers {
sourceHead := sourceHeaders[idx]
if head != sourceHead {
return fmt.Errorf("field header %d differs: expected `%s`, got `%s`", idx, head, sourceHead)
}
}
return nil
}
func readHeaders(source *bufio.Reader) ([]string, error) {
return readValues(source)
}
func readValues(source *bufio.Reader) ([]string, error) {
reader := source
headers := []string{}
var val string
var len int
emit := func() {
h := fmt.Sprintf("%*s", len, val)
headers = append(headers, h)
len = 1
val = ""
}
var r rune
var err error
for {
r, _, err = reader.ReadRune()
if err == io.EOF || r == '\n' {
if val != "" {
emit()
}
break
}
if err != nil {
return nil, fmt.Errorf("error reading headers: %w", err)
}
if unicode.IsSpace(r) {
if val == "" {
len++
} else {
emit()
}
continue
} else {
len++
}
val += string(r)
}
return headers, nil
}
// InputError wraps an error
// in order to include the position
// of failing stream.
type InputError struct {
err error
idx int
}
// Error implements error interface
func (e InputError) Error() string {
return e.err.Error()
}
// Unwrap returns the wrapped error
func (e InputError) Unwrap() error {
return e.err
}
// Convert returns an error that include the
// name of the file that causes the failure
func (e InputError) Convert(filenames []string) error {
return fmt.Errorf("Cannot read file %s: %w", filenames[e.idx], e.err)
}