sqoop在实际生产中的使用经验总结

Sqoop

sqoop原理:

详细介绍:

http://sqoop.apache.org/

1、数据安全

获取帮助:hadoop credential

1
hadoop credential create mysql.digger.prod.r.alias [-value password] -provider jceks://hdfs/mysql/password/mysql.digger.prod.r.jceks

http://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html#_importing_data_into_hive

Protecting password from preying eyes. Hadoop 2.6.0 provides an API to separate password storage from applications. This API is called the credential provided API and there is a new credential command line tool to manage passwords and their aliases. The passwords are stored with their aliases in a keystore that is password protected. The keystore password can be the provided to a password prompt on the command line, via an environment variable or defaulted to a software defined constant. Please check the Hadoop documentation on the usage of this facility.

Once the password is stored using the Credential Provider facility and the Hadoop configuration has been suitably updated, all applications can optionally use the alias in place of the actual password and at runtime resolve the alias for the password to use.

Since the keystore or similar technology used for storing the credential provider is shared across components, passwords for various applications, various database and other passwords can be securely stored in them and only the alias needs to be exposed in configuration files, protecting the password from being visible.

Sqoop has been enhanced to allow usage of this funcionality if it is available in the underlying Hadoop version being used. One new option has been introduced to provide the alias on the command line instead of the actual password (–password-alias). The argument value this option is the alias on the storage associated with the actual password. Example usage is as sqoop脚本:

1
2
-Dhadoop.security.credential.provider.path=jceks://hdfs/mysql/password/mysql.digger.prod.r.jceks \
--password-alias mysql.digger.prod.r.alias \

2、数据核对

presto:跨数据源join

java代码自动生成核对sql:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class presto_for_iglobalwin {
public static void main(String[] args) throws Exception {

ArrayList<String> array = new ArrayList<>();

Properties properties = new Properties();
InputStream in = Test01.class.getClassLoader().getResourceAsStream("druid1.properties");
properties.load(in);
in.close();
DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
Connection connection = dataSource.getConnection();
System.out.println(connection);
String sql = "show tables";
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
array.add(resultSet.getString("Tables_in_skytree_test"));
}
connection.close();

String s3="hive.skytree.";
String s4="connector2.skytree_test.";

Iterator<String> iterator = array.iterator();
while (iterator.hasNext()){
String next = iterator.next();
System.out.println("select "+"'"+next+"'"+",t1.c1,t2.c1,if(t1.c1=t2.c1,1,0),(t1.c1-t2.c1)/(t2.c1+0.0001) from (select count(*) as c1 from ( select * from "+s3+"ods_"+next+" union select *,'2021-05-07' from "+s4+next+" )t0) t1, (select count(*) as c1 from "+s4+next+")t2 union all");
}
}
}

数据不一致:

1、tinyint(1)自动转化为 bit类型

解决方案:

1
jdbc:mysql://rm-xxx.mysql.rds.aliyuncs.com:3306/digger?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false

https://sqoop.apache.org/docs/1.4.6/SqoopUserGuide.html#_mysql_import_of_tinyint_1_from_mysql_behaves_strangely

官网原文:
27.2.5. MySQL: Import of TINYINT(1) from MySQL behaves strangely
Problem: Sqoop is treating TINYINT(1) columns as booleans, which is for example causing issues with HIVE import. This is because by default the MySQL JDBC connector maps the TINYINT(1) to java.sql.Types.BIT, which Sqoop by default maps to Boolean.

Solution: A more clean solution is to force MySQL JDBC Connector to stop converting TINYINT(1) to java.sql.Types.BIT by adding tinyInt1isBit=false into your JDBC path (to create something like jdbc:mysql://localhost/test?tinyInt1isBit=false). Another solution would be to explicitly override the column mapping for the datatype TINYINT(1) column. For example, if the column name is foo, then pass the following option to Sqoop during import: –map-column-hive foo=tinyint. In the case of non-Hive imports to HDFS, use –map-column-java foo=integer.

2、字段中出现回车,换行

导致同步到hive后数据行数不对,且内容错乱。

1
--hive-drop-import-delims \

http://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html#_importing_data_into_hive

Argument Description
--hive-partition-value <v> String-value that serves as partition key for this imported into hive in this job.
--map-column-hive <map> Override default mapping from SQL type to Hive type for configured columns. If specify commas in this argument, use URL encoded keys and values, for example, use DECIMAL(1%2C%201) instead of DECIMAL(1, 1).

3、null一致性问题

1
$ sqoop import  ... --null-string '\\N' --null-non-string '\\N'

Sqoop will by default import NULL values as string null. Hive is however using string \N to denote NULL values and therefore predicates dealing with NULL (like IS NULL) will not work correctly. You should append parameters --null-string and --null-non-string in case of import job or --input-null-string and --input-null-non-string in case of an export job if you wish to properly preserve NULL values. Because sqoop is using those parameters in generated code, you need to properly escape value \N to \\N:

3、数据压缩

–compression-codec Snappy \

http://sqoop.apache.org/docs/1.4.7/SqoopUserGuide.html#_importing_data_into_hive

7.2.11. File Formats

By default, data is not compressed. You can compress your data by using the deflate (gzip) algorithm with the -z or --compress argument, or specify any Hadoop compression codec using the --compression-codec argument. This applies to SequenceFile, text, and Avro files.

4、同步策略

sql语句实现:全量,增量,新增及变化,特殊,与后面hive中周期型快照事实表/全量同步维表,事务性事实表/拉链表/累计型快照事实表、特殊表相对应。

新增及变化:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
select id,
final_total_amount,
order_status,
user_id,
out_trade_no,
create_time,
operate_time,
province_id,
benefit_reduce_amount,
original_total_amount,
feight_fee
from order_info
where (date_format(create_time, '%Y-%m-%d') = '$do_date'
or date_format(operate_time, '%Y-%m-%d') = '$do_date')
增量:
1
select * from business_ads_report where date=$do_date
全量:
1
select * from web_site_analytics_report_daily where 1=1
特殊:

同步sql和全量一样,shell/python同步函数只在第一次调用

5、提高并行度和高并发

1
2
3
-Dmapreduce.job.queuename=spark \
-m,--num-mappers <n> \
--split-by [int] \

7.2.4. Controlling Parallelism

Sqoop imports data in parallel from most database sources. You can specify the number of map tasks (parallel processes) to use to perform the import by using the -m or --num-mappers argument. Each of these arguments takes an integer value which corresponds to the degree of parallelism to employ. By default, four tasks are used. Some databases may see improved performance by increasing this value to 8 or 16. Do not increase the degree of parallelism greater than that available within your MapReduce cluster; tasks will run serially and will likely increase the amount of time required to perform the import. Likewise, do not increase the degree of parallism higher than that which your database can reasonably support. Connecting 100 concurrent clients to your database may increase the load on the database server to a point where performance suffers as a result.

When performing parallel imports, Sqoop needs a criterion by which it can split the workload. Sqoop uses a splitting column to split the workload. By default, Sqoop will identify the primary key column (if present) in a table and use it as the splitting column. The low and high values for the splitting column are retrieved from the database, and the map tasks operate on evenly-sized components of the total range. For example, if you had a table with a primary key column of id whose minimum value was 0 and maximum value was 1000, and Sqoop was directed to use 4 tasks, Sqoop would run four processes which each execute SQL statements of the form SELECT * FROM sometable WHERE id >= lo AND id < hi, with (lo, hi) set to (0, 250), (250, 500), (500, 750), and (750, 1001) in the different tasks.

If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You should explicitly choose a different column with the --split-by argument. For example, --split-by employee_id. Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

User can override the --num-mapers by using --split-limit option. Using the --split-limit parameter places a limit on the size of the split section created. If the size of the split created is larger than the size specified in this parameter, then the splits would be resized to fit within this limit, and the number of splits will change according to that.This affects actual number of mappers. If size of a split calculated based on provided --num-mappers parameter exceeds --split-limit parameter then actual number of mappers will be increased.If the value specified in --split-limit parameter is 0 or negative, the parameter will be ignored altogether and the split size will be calculated according to the number of mappers.

If a table does not have a primary key defined and the --split-by <col> is not provided, then import will fail unless the number of mappers is explicitly set to one with the --num-mappers 1 option or the --autoreset-to-one-mapper option is used. The option --autoreset-to-one-mapper is typically used with the import-all-tables tool to automatically handle tables without a primary key in a schema.

样例shell函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import_data(){
sqoop1 import \
-Dmapreduce.job.queuename=storm \
-Dhadoop.security.credential.provider.path=jceks://hdfs/mysql/password/mysql.digger.prod.r.jceks \
--connect "jdbc:mysql://rm-xxx.mysql.rds.aliyuncs.com:3306/digger?zeroDateTimeBehavior=convertToNull&useUnicode=true&characterEncoding=utf-8&useSSL=false&tinyInt1isBit=false" \
--username hadoop_read \
--password-alias mysql.digger.prod.r.alias \
--target-dir /sqoop/db/digger/$1/$do_date \
--delete-target-dir \
--query "$2 and \$CONDITIONS" \
--num-mappers 32 \
--split-by id \
--fields-terminated-by '\t' \
--compress \
--compression-codec Snappy \
--hive-drop-import-delims \
--null-string '\\N' \
--null-non-string '\\N'
}

Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信