Skip to content

Commit

Permalink
Merge pull request #5 from swiftype/external_schema
Browse files Browse the repository at this point in the history
Add an option to read avro schema from an external file
  • Loading branch information
grisha committed Mar 6, 2014
2 parents 0382dbd + c18f435 commit f62be06
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/json2avro
/avrolib
/avro-c/build
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,23 @@ and starts parsing afresh. (This behavior can be turned off with the
## Usage

```sh
./json2avro
Usage: ./json2avro [-c null|snappy|deflate|lzma] [-b <block_size (dft: 16384)>] [-d] [-j] [-x (abort on error)] -s <schema> [<infile.json>] <outfile.avro|->
If infile.json is not specified, stdin is assumed. outfile.avro of '-' is stdout.
$ ./json2avro -h
Usage: ./json2avro [options] [input_file.json] <output_file.avro>

Where options are:
-s schema (required) Avro schema to use for conversion.
-S file (required) JSON file to read the avro schema from.
-c algo (optional) Set output compression algorithm: null, snappy, deflate, lzma
Default: no compression
-b bytes (optional) Set output block size in bytes. Default: 16384
-d (optional) Turn on debug mode.
-j (optional) Dump unexpected JSON objects as strings.
-x (optional) Abort on JSON parsing errors. Default: skip invalid json.
-z bytes (optional) Maximum JSON string size. Default: no limit.
-m (optional) Linux only, enable periodic memory stats information output.
-h Show this help and exit.

If infile.json is not specified, STDIN is assumed. outfile.avro of '-' means STDOUT.
```

## Example
Expand All @@ -49,7 +63,7 @@ If we have the following JSON called `input.json`:
"a_fixed":"abcd", "an_int_array":[123, 456, -32, 0, 12],
"a_float_map":{"foo":2.345, "bar":-3.456}} {"a_null":null,
"a_bool":false, "an_int":54321, "a_long":9876543212,
"a_float":7.654321, "a_double":8.76543217654321E7,
"a_float":7.654321, "a_double":8.76543217654321E7,
"a_string":"foo bar",
"random_bytes":"\u0006K\u0007\nV@H#3\u001ad\u001a\u0006",
"a_fixed":"dcba", "an_int_array":[321, 654, -23, 0, 21],
Expand Down
107 changes: 91 additions & 16 deletions json2avro.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* Copyright 2013 Gregory Trubetskoy
*
*
* Licensed under the Apache License, Version 2.0 (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
Expand Down Expand Up @@ -28,6 +28,38 @@
#include <jansson.h>
#include <avro.h>

#define MAX_SCHEMA_LEN ((off_t) 1024*1024)

char *read_schema_file(char *file_name) {
FILE *schema_file = fopen(file_name, "rt");
if (errno != 0) {
fprintf(stderr, "Could not find or access file: %s\n", file_name);
return 0;
}

// Get file size
fseek(schema_file, 0, SEEK_END);
off_t file_size = ftell(schema_file);
fseek(schema_file, 0, SEEK_SET);

if (file_size == 0) {
fprintf(stderr, "Empty schema file: %s\n", file_name);
return 0;
}

if (file_size > MAX_SCHEMA_LEN) {
fprintf(stderr, "Schema file size is too big: %lld bytes > %lld maximum supported length\n", file_size, MAX_SCHEMA_LEN);
return 0;
}

// Allocate buffer for the schema and read the data
char *buf = (char*) malloc(file_size);
fread(buf, 1, file_size, schema_file);
fclose(schema_file);

return buf;
}

void memory_status() {
/* This is obviously Linux-only */
const char* statm_path = "/proc/self/statm";
Expand All @@ -38,7 +70,7 @@ void memory_status() {
fclose(f);
}

int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,
int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,
avro_value_t *current_val, int quiet, int strjson, size_t max_str_sz) {

json = json ? json : dft;
Expand Down Expand Up @@ -67,7 +99,7 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,

avro_value_t field;
avro_value_get_by_index(current_val, i, &field, NULL);

if (schema_traverse(field_schema, json_val, dft, &field, quiet, strjson, max_str_sz))
return 1;
}
Expand Down Expand Up @@ -170,7 +202,7 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,
fprintf(stderr, "ERROR: Expecting JSON null for Avro null, got something else\n");
return 1;
}
avro_value_set_null(current_val);
avro_value_set_null(current_val);
break;

case AVRO_ENUM:
Expand Down Expand Up @@ -250,7 +282,7 @@ int schema_traverse(const avro_schema_t schema, json_t *json, json_t *dft,
return 0;
}

void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
int verbose, int memstat, int errabort, int strjson, size_t max_str_sz) {

json_error_t err;
Expand Down Expand Up @@ -286,7 +318,7 @@ void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,

} else
fprintf(stderr, "Error processing record %d, skipping...\n", n);

avro_value_iface_decref(iface);
avro_value_decref(&record);

Expand All @@ -297,12 +329,38 @@ void process_file(FILE *input, avro_file_writer_t out, avro_schema_t schema,
json = json_loadf(input, JSON_DISABLE_EOF_CHECK, &err);
}

if (memstat) memory_status();

avro_schema_decref(schema);
}

int main(int argc, char *argv[]) {
void print_help(char *program_name) {
fprintf(stderr, "Usage: %s [options] [input_file.json] <output_file.avro>\n", program_name);
fprintf(stderr, "\n");
fprintf(stderr, "Where options are:\n");
fprintf(stderr, " -s schema (required) Avro schema to use for conversion.\n");
fprintf(stderr, " -S file (required) JSON file to read the avro schema from.\n");
fprintf(stderr, " -c algo (optional) Set output compression algorithm: null, snappy, deflate, lzma\n");
fprintf(stderr, " Default: no compression\n");
fprintf(stderr, " -b bytes (optional) Set output block size in bytes. Default: 16384\n");
fprintf(stderr, " -d (optional) Turn on debug mode.\n");
fprintf(stderr, " -j (optional) Dump unexpected JSON objects as strings.\n");
fprintf(stderr, " -x (optional) Abort on JSON parsing errors. Default: skip invalid json.\n");
fprintf(stderr, " -z bytes (optional) Maximum JSON string size. Default: no limit.\n");
fprintf(stderr, " -m (optional) Linux only, enable periodic memory stats information output.\n");
fprintf(stderr, " -h Show this help and exit.\n");
fprintf(stderr, "\n");
fprintf(stderr, "If infile.json is not specified, STDIN is assumed. outfile.avro of '-' means STDOUT.\n");
fprintf(stderr, "\n");
}

void usage_error(char *program_name, char *message) {
if (message) fprintf(stderr, "ERROR: %s\n\n", message);
print_help(program_name);
exit(EXIT_FAILURE);
}

int main(int argc, char *argv[]) {
FILE *input;

avro_schema_t schema;
Expand All @@ -319,11 +377,14 @@ int main(int argc, char *argv[]) {
extern char *optarg;
extern int optind, optopt;

while ((opt = getopt(argc, argv, "c:s:b:z:dmxj")) != -1) {
while ((opt = getopt(argc, argv, "c:s:S:b:z:dmxjh")) != -1) {
switch (opt) {
case 's':
case 's':
schema_arg = optarg;
break;
case 'S':
schema_arg = read_schema_file(optarg);
break;
case 'b':
block_sz = strtol(optarg, &endptr, 0);
if (*endptr) {
Expand All @@ -338,7 +399,7 @@ int main(int argc, char *argv[]) {
opterr++;
}
break;
case 'c':
case 'c':
codec = optarg;
break;
case 'd':
Expand All @@ -351,8 +412,15 @@ int main(int argc, char *argv[]) {
strjson = 1;
break;
case 'm':
memstat = 1;
#if defined(__linux__)
memstat = 1;
#else
usage_error(argv[0], "Memory stats is a Linux-only feature!");
#endif
break;
case 'h':
print_help(argv[0]);
exit(0);
case ':':
fprintf(stderr, "ERROR: Option -%c requires an operand\n", optopt);
opterr++;
Expand All @@ -363,11 +431,17 @@ int main(int argc, char *argv[]) {
}
}

if ((argc - optind) < 1 || (argc - optind) > 2 || opterr || !schema_arg) {
fprintf(stderr, "Usage: %s [-c null|snappy|deflate|lzma] [-b <block_size (dft: 16384)>] [-d] [-j] [-x (abort on error)] [-z <max_str_sz>] -s <schema> [<infile.json>] <outfile.avro|->\n", argv[0]);
fprintf(stderr, "If infile.json is not specified, stdin is assumed. outfile.avro of '-' is stdout.\n");
exit(EXIT_FAILURE);
int file_args_cnt = (argc - optind);
if (file_args_cnt == 0) {
usage_error(argv[0], "Please provide at least one file name argument");
}
if (file_args_cnt > 2) {
fprintf(stderr, "Too many file name arguments: %d!\n", file_args_cnt);
usage_error(argv[0], 0);
}

if (opterr) usage_error(argv[0], 0);
if (!schema_arg) usage_error(argv[0], "Please provide correct schema!");

if (!codec) codec = "null";
else if (strcmp(codec, "snappy") && strcmp(codec, "deflate") && strcmp(codec, "lzma") && strcmp(codec, "null")) {
Expand All @@ -382,7 +456,8 @@ int main(int argc, char *argv[]) {
outpath = argv[optind+1];
input = fopen(argv[optind], "rb");
if ( errno != 0 ) {
perror("ERROR: Cannot open file");
fprintf(stderr, "ERROR: Cannot open input file: %s: ", argv[optind]);
perror(0);
exit(EXIT_FAILURE);
}
}
Expand Down

0 comments on commit f62be06

Please sign in to comment.