Use LLM transfrom
This notebook shares how to create a example sentiment analysis pipeline using apache beam and use LLM transform to catragorise the sentiment of product reviews and run the pipeline locally using direct runner.
The pipeline
- loads the csv file that contains reviews and feedback of a product
- uses LLM transform to catragorise reviews
- print the model output
1. Create a maven project
mvn org.apache.maven.plugins:maven-archetype-plugin:3.1.2:generate \
-DarchetypeArtifactId="maven-archetype-quickstart" \
-DarchetypeGroupId="org.apache.maven.archetypes" \
-DarchetypeVersion="1.4" \
-DgroupId="com.example" \
-DartifactId="sentimentanalysis"
2. Add required dependencies in pom.xml
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.60.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.60.0</version>
</dependency>
<dependency>
<groupId>io.github.ganeshsivakumar</groupId>
<artifactId>langchain-beam</artifactId>
<version>0.2.0</version>
</dependency>
3. Create the beam pipeline with LLM transform
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.langchainbeam.LangchainBeam;
import com.langchainbeam.LangchainModelHandler;
import com.langchainbeam.model.LangchainBeamOutput;
import com.langchainbeam.model.openai.OpenAiModelOptions;
public class SentimentAnalysis {
public static void main(String[] args) {
// instruction prompt
String prompt = "Categorize the product review as Positive or Negative.";
String apiKey = System.getenv("OPENAI_API_KEY");
// Create model options
OpenAiModelOptions modelOptions = OpenAiModelOptions.builder()
.modelName("gpt-4o-mini")
.apiKey(apiKey)
.build();
// create handler
LangchainModelHandler sentimentHandler = new LangchainModelHandler(modelOptions, prompt);
// create beam pipeline options
PipelineOptions options = PipelineOptionsFactory.create();
// set DirectRunner
options.setRunner(DirectRunner.class);
// create beam pipeline
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("/home/ganesh/Downloads/product_reviews.csv"))// set file path
.apply(LangchainBeam.run(sentimentHandler)) // apply the LLM transform.
.apply(ParDo.of(new DoFn<LangchainBeamOutput, Void>() {
@ProcessElement
public void processElement(@Element LangchainBeamOutput out) {
System.out.println("Model Output: " + out.getOutput());
}
}));
p.run();
}
}
Check out the example sentiment analysis pipeline with product reviews data on github