Skip to content

Update java/src/com/twitter/pycascading/Util.java#3

Open
TaoLinVT wants to merge 2 commits intoianoc:casc2from
TaoLinVT:casc2
Open

Update java/src/com/twitter/pycascading/Util.java#3
TaoLinVT wants to merge 2 commits intoianoc:casc2from
TaoLinVT:casc2

Conversation

@TaoLinVT
Copy link
Copy Markdown

@TaoLinVT TaoLinVT commented Jan 2, 2013

Add gzip compression support (Also need to enable it in python code).

Add gzip compression support (Also need to enable it in python code).
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no if statement around any of this, we are changing the upstream behavior of pycascading ?

@TaoLinVT
Copy link
Copy Markdown
Author

TaoLinVT commented Jan 2, 2013

There are two reasons (Reformat):

(1). My test shows that these have no impact unless python code enables compression. For example changes following

redis_index = test_flow.basic_sink(cascading.scheme.hadoop.TextLine(), \
                         outputs["redis_index"])

to:

redis_index = redis_builder.basic_sink(cascading.scheme.hadoop.TextLine(cascading.scheme.hadoop.TextLine.Compress.ENABLE), \
                         outputs["redis_index"])

Only the one with compress enabled will generate gz files. All the others remain the same.

(2). Consider the use case in which one flow has multiple sinks. If there is one sink with compress.enabled scheme, then we have to enable those java compress codes. Those properties will also be applied to other sinks as well, even if they do not have compress scheme. Because this cannot be avoid, there is no point to add if statement around those new statements.

Thanks,
Tao

@ianoc
Copy link
Copy Markdown
Owner

ianoc commented Jan 2, 2013

Its very specific to GZIP and a particular set of flags to go into upstream. I'll look at being able to pass in a set of hadoop properties into the run option instead. Changing the code upstream if we decide on a different codec seems like a bad idea. If its passed into run rather than a sink to control what compression is used then it will effect all sinks as expected?

@TaoLinVT
Copy link
Copy Markdown
Author

TaoLinVT commented Jan 2, 2013

OK, I see.

How about the following which allows us to use different codec later by adding entries to config:
mapredOutputCompress = config.get("mapred.output.compress", "true")
mapredCompressMapOut = config.get("mapred.compress.map.output", "true")
mapredOutputCompressionType = config.get("mapred.output.compression.type", "BLOCK")
mapredMapOutputCompressionCodec = config.get("mapred.map.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
mapredOutputCompressionCodec = "mapred.output.compression.codec","org.apache.hadoop.io.compress.GzipCodec")
properties.setProperty("mapred.output.compress", mapredOutputCompress);
properties.setProperty("mapred.compress.map.output", mapredCompressMapOut);
properties.setProperty("mapred.output.compression.type", mapredOutputCompressionType);
properties.setProperty("mapred.map.output.compression.codec", mapredMapOutputCompressionCodec);
properties.setProperty("mapred.output.compression.codec", mapredOutputCompressionCodec);

Supported codec strings are defined by:
http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/compress/package-summary.html

@ianoc
Copy link
Copy Markdown
Owner

ianoc commented Feb 6, 2013

This seems to:

  1. Alter default behavior in potentially unexpected ways
  2. Add very specific defaults(which should always be to existing behavior if there are any without good motivation for the contrary)
  3. Adds lots more code for specific hadoop flags.

The config map is a String -> Object map, so we can have a case where it maps to a string -> string map

i.e. config.get("pycascading.hadoop.mapred.options") -> Map<String, String>

We then iterate through this string map setting all the key's present. It should leave the existing behavior in pycascading as it was and let us easily pass in a hash map to set any options (for compression or anything else in future too).

Thoughts?

@TaoLinVT
Copy link
Copy Markdown
Author

OK. Here is plan:

pycascading.pipe.config = dict() defined in python/pycascading/bootstrap.py (line 70). And it will be passed to python/pycascading/tap.py (line 237) as parameter config to Util.run().

So

  1. We can check whether config has "pycascading.hadoop.mapred.options" key, if yes, then we get the map defined by "pycascading.hadoop.mapred.options" and apply settings (key/val pairs). For example,

    #Add following lines to Util.java before line 153
    key_for_config_options = "pycascading.hadoop.mapred.options"
    if key_for_config_options in config:
    config_options = config.get(key_for_config_options)
    for (option_name, option_value) in config_options.iteritems():
    properties.setProperty(option_name, option_value);

  2. We also need to revise python/pycascading/tap.py Flow.run API (line 228) so that we can pass configuration to Util.run.

    def run(self, name="pycascading flow", num_reducers=50, min_split_size=0, config=None):
    """Start the Cascading job.

    We call this when we are done building the pipeline and explicitly want
    to start the flow process.
    """
    source_map = self.__get_active_sources()
    tails = [t.get_assembly() for t in self.tails]
    import pycascading.pipe
    key_for_config_options = "pycascading.hadoop.mapred.options"
    if config and key_for_config_options in config:
        pycascading.pipe.config[key_for_config_options] = config[key_for_config_options]
    Util.run(name, num_reducers, min_split_size, pycascading.pipe.config, source_map, \
             self.sink_map, tails)
    
  3. Now the last step in our grid script, we should call flow.run() with config dict which contains following key/value pair:

    "pycascading.hadoop.mapred.options": {
    "mapred.output.compress": "true",
    "mapred.compress.map.output": "true",
    "mapred.output.compression.type": "BLOCK",
    "mapred.map.output.compression.codec": "org.apache.hadoop.io.compress.GzipCodec",
    "mapred.output.compression.codec": "org.apache.hadoop.io.compress.GzipCodec",
    }

…script and default behavior is the same as before.
@TaoLinVT
Copy link
Copy Markdown
Author

Please see the new diff. Thank you! -- Tao

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants