diff --git a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java index 9e8cd9aa8cb..1a8f76c0db8 100644 --- a/solr/core/src/java/org/apache/solr/cli/CLIUtils.java +++ b/solr/core/src/java/org/apache/solr/cli/CLIUtils.java @@ -180,7 +180,7 @@ public static SolrClient getSolrClient(CommandLine cli) throws Exception { * is used, and warns those users. In the future we'll have urls ending with /api as well. * * @param solrUrl The user supplied url to Solr. - * @return the solrUrl in the format that Solr expects to see internally. + * @return a URL without any path, e.g. {@code http://localhost:8983} */ public static String normalizeSolrUrl(String solrUrl) { return normalizeSolrUrl(solrUrl, true); @@ -192,7 +192,7 @@ public static String normalizeSolrUrl(String solrUrl) { * * @param solrUrl The user supplied url to Solr. * @param logUrlFormatWarning If a warning message should be logged about the url format - * @return the solrUrl in the format that Solr expects to see internally. + * @return a URL without any path, e.g. {@code http://localhost:8983} */ public static String normalizeSolrUrl(String solrUrl, boolean logUrlFormatWarning) { if (solrUrl != null) { diff --git a/solr/core/src/java/org/apache/solr/cli/StreamTool.java b/solr/core/src/java/org/apache/solr/cli/StreamTool.java index 203291c53f0..07ce211521f 100644 --- a/solr/core/src/java/org/apache/solr/cli/StreamTool.java +++ b/solr/core/src/java/org/apache/solr/cli/StreamTool.java @@ -38,7 +38,6 @@ import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; -import org.apache.solr.client.solrj.io.Lang; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; @@ -46,11 +45,12 @@ import org.apache.solr.client.solrj.io.stream.SolrStream; import org.apache.solr.client.solrj.io.stream.StreamContext; import org.apache.solr.client.solrj.io.stream.TupleStream; +import org.apache.solr.client.solrj.io.stream.expr.DefaultStreamFactory; import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.Expressible; import org.apache.solr.client.solrj.io.stream.expr.StreamExpression; -import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.jetty.HttpJettySolrClient; import org.apache.solr.common.SolrException; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.handler.CatStream; @@ -62,8 +62,6 @@ public StreamTool(ToolRuntime runtime) { super(runtime); } - private final SolrClientCache solrClientCache = new SolrClientCache(); - @Override public String getName() { return "stream"; @@ -166,14 +164,30 @@ public void runImpl(CommandLine cli) throws Exception { } } - PushBackStream pushBackStream; - if (execution.equalsIgnoreCase("local")) { - pushBackStream = doLocalMode(cli, expr); - } else { - pushBackStream = doRemoteMode(cli, expr); + // Validate inputs before opening any connection to Solr. + boolean local = execution.equalsIgnoreCase("local"); + if (!local) { + if (!cli.hasOption(COLLECTION_OPTION)) { + throw new IllegalStateException( + "You must provide --name COLLECTION with --execution remote parameter."); + } + if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { + throw new IllegalStateException( + "The stdin() expression is only usable with --worker local set up."); + } } + // a stream needs a context + StreamContext streamContext = createStreamContext(cli); + // create the stream + PushBackStream pushBackStream = null; try { + if (local) { + pushBackStream = doLocalMode(expr, streamContext.getStreamFactory()); + } else { + pushBackStream = doRemoteMode(expr, cli); + } + pushBackStream.setStreamContext(streamContext); pushBackStream.open(); if (outputHeaders == null) { @@ -226,36 +240,58 @@ public void runImpl(CommandLine cli) throws Exception { } } } finally { - pushBackStream.close(); - solrClientCache.close(); + if (pushBackStream != null) { + pushBackStream.close(); + } + streamContext.getSolrClientCache().close(); } echoIfVerbose("StreamTool -- Done."); } + private StreamContext createStreamContext(CommandLine cli) throws Exception { + var jettyClientBuilder = new HttpJettySolrClient.Builder(); + String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); + if (credentials != null) { + String[] userPass = credentials.split(":"); + jettyClientBuilder.withBasicAuthCredentials(userPass[0], userPass[1]); + } + HttpJettySolrClient client = jettyClientBuilder.build(); + + // subclass so we can ensure our client is closed when the cache is closed + var solrClientCache = + new SolrClientCache(client) { + @Override + public synchronized void close() { + super.close(); + client.close(); + } + }; + + var solrConnection = CLIUtils.getSolrConnection(cli); + echoIfVerbose("Connecting to Solr at " + solrConnection); + + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(solrClientCache); + + StreamFactory streamFactory = new DefaultStreamFactory(); + streamFactory.withDefaultSolrConnection(solrConnection); + streamContext.setStreamFactory(streamFactory); + return streamContext; + } + /** * Runs a streaming expression in the local process of the CLI. * *

Running locally means that parallelization support or those expressions requiring access to * internal Solr capabilities will not function. * - * @param cli The CLI invoking the call - * @param expr The streaming expression to be parsed and in the context of the CLI process + * @param expr The streaming expression to be parsed and run in the context of the CLI process + * @param streamFactory The factory used to construct the streaming expression * @return A connection to the streaming expression that receives Tuples as they are emitted * locally. */ - private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exception { - var solrConnection = CLIUtils.getSolrConnection(cli); - echoIfVerbose("Connecting to Solr at " + solrConnection.toString()); - solrClientCache.setBasicAuthCredentials( - cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION)); - solrClientCache.getCloudSolrClient(solrConnection); - - TupleStream stream; - PushBackStream pushBackStream; - - StreamExpression streamExpression = StreamExpressionParser.parse(expr); - StreamFactory streamFactory = new StreamFactory(); + private PushBackStream doLocalMode(String expr, StreamFactory streamFactory) throws Exception { // stdin is ONLY available in the local mode, not in the remote mode as it // requires access to System.in @@ -265,23 +301,7 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio // logic about where to read data from. streamFactory.withFunctionName("cat", LocalCatStream.class); - streamFactory.withDefaultSolrConnection(solrConnection); - - Lang.register(streamFactory); - - assert streamExpression != null; - stream = streamFactory.constructStream(streamExpression); - - pushBackStream = new PushBackStream(stream); - - // Now we can run the stream and return the results. - StreamContext streamContext = new StreamContext(); - streamContext.setSolrClientCache(solrClientCache); - - // Output the headers - pushBackStream.setStreamContext(streamContext); - - return pushBackStream; + return new PushBackStream(streamFactory.constructStream(expr)); } /** @@ -291,35 +311,18 @@ private PushBackStream doLocalMode(CommandLine cli, String expr) throws Exceptio *

Running remotely allows you to use all the standard Streaming Expression capabilities as the * expression is running in a Solr environment. * - * @param cli The CLI invoking the call * @param expr The streaming expression to be parsed and run remotely + * @param cli The CLI invoking the call * @return A connection to the streaming expression that receives Tuples as they are emitted from * Solr /stream. */ - private PushBackStream doRemoteMode(CommandLine cli, String expr) throws Exception { + private PushBackStream doRemoteMode(String expr, CommandLine cli) throws Exception { String solrUrl = CLIUtils.normalizeSolrUrl(cli); - if (!cli.hasOption(COLLECTION_OPTION)) { - throw new IllegalStateException( - "You must provide --name COLLECTION with --execution remote parameter."); - } String collection = cli.getOptionValue(COLLECTION_OPTION); - if (expr.toLowerCase(Locale.ROOT).contains("stdin(")) { - throw new IllegalStateException( - "The stdin() expression is only usable with --worker local set up."); - } - - final SolrStream solrStream = - new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr)); - - String credentials = cli.getOptionValue(CommonCLIOptions.CREDENTIALS_OPTION); - if (credentials != null) { - String username = credentials.split(":")[0]; - String password = credentials.split(":")[1]; - solrStream.setCredentials(username, password); - } - return new PushBackStream(solrStream); + return new PushBackStream( + new SolrStream(solrUrl + "/solr/" + collection, params("qt", "/stream", "expr", expr))); } private static ModifiableSolrParams params(String... params) {